16.8. プロデューサーおよびコンシューマーをトレース用にインストルメント化


アプリケーションコードを計測して、Kafka プロデューサーとコンシューマーでのトレースを有効にします。デコレーターパターンまたはインターセプターを使用して、Java プロデューサーおよびコンシューマーアプリケーションコードをトレース用にインストルメント化します。続いて、メッセージが生成されたとき、またはトピックから取得されたときにトレースを記録できます。

OpenTelemetry インストルメント化プロジェクトは、プロデューサーとコンシューマーのインストルメント化をサポートするクラスを提供します。

デコレーターのインストルメント化
デコレーターのインストルメント化では、トレース用に変更したプロデューサーまたはコンシューマーインスタンスを作成します。
インターセプターのインストルメント化
インターセプターのインストルメント化の場合、トレース機能をコンシューマーまたはプロデューサーの設定に追加します。

前提条件

  • クライアントのトレースを初期化 している。

    トレース JAR を依存関係としてプロジェクトに追加して、プロデューサーアプリケーションとコンシューマーアプリケーションでインストルメント化を有効にしている。

手順

各プロデューサーおよびコンシューマーアプリケーションのアプリケーションコードで、これらの手順を実行します。デコレーターパターンまたはインターセプターのいずれかを使用して、クライアントアプリケーションコードをインストルメント化します。

  • デコレーターパターンを使用するには、変更したプロデューサーまたはコンシューマーインスタンスを作成して、メッセージを送受信します。

    元の KafkaProducer または KafkaConsumer クラスを渡します。

    OpenTelemetry のデコレーターインストルメント化の例

    // Producer instance
    Producer < String, String > op = new KafkaProducer < > (
        configs,
        new StringSerializer(),
        new StringSerializer()
        );
        Producer < String, String > producer = tracing.wrap(op);
    KafkaTracing tracing = KafkaTracing.create(GlobalOpenTelemetry.get());
    producer.send(...);
    
    //consumer instance
    Consumer<String, String> oc = new KafkaConsumer<>(
        configs,
        new StringDeserializer(),
        new StringDeserializer()
        );
        Consumer<String, String> consumer = tracing.wrap(oc);
    consumer.subscribe(Collections.singleton("mytopic"));
    ConsumerRecords<Integer, String> records = consumer.poll(1000);
    ConsumerRecord<Integer, String> record = ...
    SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);

  • インターセプターを使用するには、プロデューサーまたはコンシューマーの設定でインターセプタークラスを設定します。

    通常の方法で KafkaProducer クラスと KafkaConsumer クラスを使用します。TracingProducerInterceptor および TracingConsumerInterceptor インターセプタークラスは、トレース機能を処理します。

    インターセプターを使用したプロデューサー設定の例

    senderProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
        TracingProducerInterceptor.class.getName());
    
    KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
    producer.send(...);

    インターセプターを使用したコンシューマー設定の例

    consumerProps.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
        TracingConsumerInterceptor.class.getName());
    
    KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
    consumer.subscribe(Collections.singletonList("messages"));
    ConsumerRecords<Integer, String> records = consumer.poll(1000);
    ConsumerRecord<Integer, String> record = ...
    SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);

Red Hat logoGithubRedditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

© 2024 Red Hat, Inc.