21.3. 分散トレースの設定


カスタムリソースでトレースタイプを指定して、Kafka コンポーネントで分散トレースを有効にします。メッセージをエンドツーエンドで追跡するために Kafka クライアントにトレーサーをインストルメント化します。

分散トレースを設定するには、次の手順を順番に実行します。

21.3.1. 前提条件

分散トレースを設定する前に、Jaeger バックエンドコンポーネントが OpenShift クラスターにデプロイされていることを確認してください。OpenShift クラスターに Jaeger をデプロイするには、Jaeger Operator を使用することを推奨します。

デプロイメント手順は、Jaeger のドキュメント を参照してください。

注記

AMQ Streams 以外のアプリケーションおよびシステムにトレースを設定する方法については、このコンテンツの対象外となります。

21.3.2. MirrorMaker、Kafka Connect、および Kafka Bridge リソースでのトレーシングの有効化

分散トレースは、MirrorMaker、MirrorMaker 2、Kafka Connect、および AMQ Streams Kafka Bridge でサポートされています。コンポーネントのカスタムリソースを設定して、トレーサーサービスを指定して有効にします。

リソースでトレースを有効にすると、次のイベントがトリガーされます。

  • インターセプタークラスは、コンポーネントの統合コンシューマーとプロデューサーで更新されます。
  • MirrorMaker、MirrorMaker 2、および Kafka Connect の場合、トレースエージェントは、リソースで定義されたトレース設定に基づいてトレーサーを初期化します。
  • Kafka Bridge の場合、リソースで定義されたトレース設定に基づくトレーサーは、Kafka Bridge 自体によって初期化されます。

OpenTelemetry または OpenTracing を使用するトレースを有効にできます。

MirrorMaker および MirrorMaker 2 でのトレース

MirrorMaker および MirrorMaker 2 の場合、メッセージはソースクラスターからターゲットクラスターまでトレースされます。トレースデータは、MirrorMaker または MirrorMaker 2 コンポーネントに出入りするメッセージを記録します。

Kafka Connect でのトレーシング

Kafka Connect の場合、Kafka Connect によって生成および消費されたメッセージのみがトレースされます。Kafka Connect と外部システム間で送信されるメッセージをトレースするには、これらのシステムのコネクターでトレースを設定する必要があります。

Kafka Bridge でのトレーシング

Kafka Bridge の場合、Kafka Bridge によって生成および消費されるメッセージがトレースされます。Kafka Bridge を介してメッセージを送受信するクライアントアプリケーションから受信する HTTP リクエストもトレーシングされます。エンドツーエンドのトレーシングを設定するために、HTTP クライアントでトレーシングを設定する必要があります。

手順

以下の手順を、KafkaMirrorMakerKafkaMirrorMaker2KafkaConnect、および KafkaBridge リソースごとに実行します。

  1. spec.template プロパティーで、トレーサーサービスを設定します。

    • トレーシング環境変数 をテンプレートの設定プロパティーとして使用します。
    • OpenTelemetry の場合、spec.tracing.type プロパティーを opentelemetry に設定します。
    • OpenTracing の場合、spec.tracing.type プロパティーを jaeger に設定します。

    OpenTelemetry を使用した Kafka Connect のトレース設定の例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnect
    metadata:
      name: my-connect-cluster
    spec:
      #...
      template:
        connectContainer:
          env:
            - name: OTEL_SERVICE_NAME
              value: my-otel-service
            - name: OTEL_EXPORTER_OTLP_ENDPOINT
              value: "http://otlp-host:4317"
      tracing:
        type: opentelemetry
      #...
    Copy to Clipboard Toggle word wrap

    OpenTelemetry を使用した MirrorMaker のトレース設定の例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaMirrorMaker
    metadata:
      name: my-mirror-maker
    spec:
      #...
      template:
        mirrorMakerContainer:
          env:
            - name: OTEL_SERVICE_NAME
              value: my-otel-service
            - name: OTEL_EXPORTER_OTLP_ENDPOINT
              value: "http://otlp-host:4317"
      tracing:
        type: opentelemetry
    #...
    Copy to Clipboard Toggle word wrap

    OpenTelemetry を使用した MirrorMaker 2 のトレース設定の例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaMirrorMaker2
    metadata:
      name: my-mm2-cluster
    spec:
      #...
      template:
        connectContainer:
          env:
            - name: OTEL_SERVICE_NAME
              value: my-otel-service
            - name: OTEL_EXPORTER_OTLP_ENDPOINT
              value: "http://otlp-host:4317"
      tracing:
        type: opentelemetry
    #...
    Copy to Clipboard Toggle word wrap

    OpenTelemetry を使用した Kafka Bridge のトレース設定の例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaBridge
    metadata:
      name: my-bridge
    spec:
      #...
      template:
        bridgeContainer:
          env:
            - name: OTEL_SERVICE_NAME
              value: my-otel-service
            - name: OTEL_EXPORTER_OTLP_ENDPOINT
              value: "http://otlp-host:4317"
      tracing:
        type: opentelemetry
    #...
    Copy to Clipboard Toggle word wrap

    OpenTracing を使用した Kafka Connect のトレース設定の例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnect
    metadata:
      name: my-connect-cluster
    spec:
      #...
      template:
        connectContainer:
          env:
            - name: JAEGER_SERVICE_NAME
              value: my-jaeger-service
            - name: JAEGER_AGENT_HOST
              value: jaeger-agent-name
            - name: JAEGER_AGENT_PORT
              value: "6831"
      tracing:
        type: jaeger
      #...
    Copy to Clipboard Toggle word wrap

    OpenTracing を使用した MirrorMaker のトレース設定の例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaMirrorMaker
    metadata:
      name: my-mirror-maker
    spec:
      #...
      template:
        mirrorMakerContainer:
          env:
            - name: JAEGER_SERVICE_NAME
              value: my-jaeger-service
            - name: JAEGER_AGENT_HOST
              value: jaeger-agent-name
            - name: JAEGER_AGENT_PORT
              value: "6831"
      tracing:
        type: jaeger
    #...
    Copy to Clipboard Toggle word wrap

    OpenTracing を使用した MirrorMaker 2 のトレース設定の例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaMirrorMaker2
    metadata:
      name: my-mm2-cluster
    spec:
      #...
      template:
        connectContainer:
          env:
            - name: JAEGER_SERVICE_NAME
              value: my-jaeger-service
            - name: JAEGER_AGENT_HOST
              value: jaeger-agent-name
            - name: JAEGER_AGENT_PORT
              value: "6831"
      tracing:
        type: jaeger
    #...
    Copy to Clipboard Toggle word wrap

    OpenTracing を使用した Kafka Bridge のトレース設定の例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaBridge
    metadata:
      name: my-bridge
    spec:
      #...
      template:
        bridgeContainer:
          env:
            - name: JAEGER_SERVICE_NAME
              value: my-jaeger-service
            - name: JAEGER_AGENT_HOST
              value: jaeger-agent-name
            - name: JAEGER_AGENT_PORT
              value: "6831"
      tracing:
        type: jaeger
    #...
    Copy to Clipboard Toggle word wrap

  2. リソースを作成または更新します。

    oc apply -f <resource_configuration_file>
    Copy to Clipboard Toggle word wrap

21.3.3. Kafka クライアントのトレースの初期化

トレーサーを初期化し、分散トレース用にクライアントアプリケーションをインストルメント化します。Kafka プロデューサークライアントとコンシューマークライアント、および Kafka Streams API アプリケーションをインストルメント化できます。OpenTracing または OpenTelemetry のトレーサーを初期化できます。

一連の トレース環境変数 を使用して、トレーサーを設定および初期化します。

手順

各クライアントアプリケーションで、トレーサーの依存関係を追加します。

  1. クライアントアプリケーションの pom.xml ファイルに Maven 依存関係を追加します。

    OpenTelemetry の依存関係

    <dependency>
        <groupId>io.opentelemetry</groupId>
        <artifactId>opentelemetry-sdk-extension-autoconfigure</artifactId>
        <version>1.19.0.redhat-00002</version>
    </dependency>
    <dependency>
      <groupId>io.opentelemetry.instrumentation</groupId>
      <artifactId>opentelemetry-kafka-clients-{OpenTelemetryKafkaClient}</artifactId>
      <version>1.19.0.redhat-00002</version>
    </dependency>
    <dependency>
      <groupId>io.opentelemetry</groupId>
      <artifactId>opentelemetry-exporter-otlp</artifactId>
      <version>1.19.0.redhat-00002</version>
    </dependency>
    Copy to Clipboard Toggle word wrap

    OpenTracing の依存関係

    <dependency>
        <groupId>io.jaegertracing</groupId>
        <artifactId>jaeger-client</artifactId>
        <version>1.8.1.redhat-00002</version>
    </dependency>
    <dependency>
      <groupId>io.opentracing.contrib</groupId>
      <artifactId>opentracing-kafka-client</artifactId>
      <version>0.1.15.redhat-00006</version>
    </dependency>
    Copy to Clipboard Toggle word wrap

  2. トレース環境変数 を使用して、トレーサーの設定を定義します。
  3. 環境変数で初期化されるトレーサーを作成します。

    OpenTelemetry のトレーサーの作成

    OpenTelemetry ot = GlobalOpenTelemetry.get();
    Copy to Clipboard Toggle word wrap

    OpenTracing のトレーサーの作成

    Tracer tracer = Configuration.fromEnv().getTracer();
    Copy to Clipboard Toggle word wrap

  4. トレーサーをグローバルトレーサーとして登録します。

    GlobalTracer.register(tracer);
    Copy to Clipboard Toggle word wrap
  5. クライアントをインストルメント化します。

21.3.4. Kafka プロデューサーおよびコンシューマーをトレース用にインストルメント化

アプリケーションコードを計測して、Kafka プロデューサーとコンシューマーでのトレースを有効にします。デコレーターパターンまたはインターセプターを使用して、Java プロデューサーおよびコンシューマーアプリケーションコードをトレース用にインストルメント化します。続いて、メッセージが生成されたとき、またはトピックから取得されたときにトレースを記録できます。

OpenTelemetry および OpenTracing インストルメント化プロジェクトは、プロデューサーとコンシューマーのインストルメント化をサポートするクラスを提供します。

デコレーターのインストルメント化
デコレーターのインストルメント化では、トレース用に変更したプロデューサーまたはコンシューマーインスタンスを作成します。OpenTelemetry と OpenTracing では、デコレーターのインストルメント化が異なります。
インターセプターのインストルメント化
インターセプターのインストルメント化の場合、トレース機能をコンシューマーまたはプロデューサーの設定に追加します。インターセプターのインストルメント化は、OpenTelemetry と OpenTracing で同じです。

前提条件

  • クライアントのトレースを初期化 している。

    トレース JAR を依存関係としてプロジェクトに追加して、プロデューサーアプリケーションとコンシューマーアプリケーションでインストルメント化を有効にしている。

手順

各プロデューサーおよびコンシューマーアプリケーションのアプリケーションコードで、これらの手順を実行します。デコレーターパターンまたはインターセプターのいずれかを使用して、クライアントアプリケーションコードをインストルメント化します。

  • デコレーターパターンを使用するには、変更したプロデューサーまたはコンシューマーインスタンスを作成して、メッセージを送受信します。

    元の KafkaProducer または KafkaConsumer クラスを渡します。

    OpenTelemetry のデコレーターインストルメント化の例

    // Producer instance
    Producer < String, String > op = new KafkaProducer < > (
        configs,
        new StringSerializer(),
        new StringSerializer()
        );
        Producer < String, String > producer = tracing.wrap(op);
    KafkaTracing tracing = KafkaTracing.create(GlobalOpenTelemetry.get());
    producer.send(...);
    
    //consumer instance
    Consumer<String, String> oc = new KafkaConsumer<>(
        configs,
        new StringDeserializer(),
        new StringDeserializer()
        );
        Consumer<String, String> consumer = tracing.wrap(oc);
    consumer.subscribe(Collections.singleton("mytopic"));
    ConsumerRecords<Integer, String> records = consumer.poll(1000);
    ConsumerRecord<Integer, String> record = ...
    SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);
    Copy to Clipboard Toggle word wrap

    OpenTracing のデコレーターインストルメント化の例

    //producer instance
    KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
    TracingKafkaProducer<Integer, String> tracingProducer = new TracingKafkaProducer<>(producer, tracer);
    TracingKafkaProducer.send(...)
    
    //consumer instance
    KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
    TracingKafkaConsumer<Integer, String> tracingConsumer = new TracingKafkaConsumer<>(consumer, tracer);
    tracingConsumer.subscribe(Collections.singletonList("mytopic"));
    ConsumerRecords<Integer, String> records = tracingConsumer.poll(1000);
    ConsumerRecord<Integer, String> record = ...
    SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);
    Copy to Clipboard Toggle word wrap

  • インターセプターを使用するには、プロデューサーまたはコンシューマーの設定でインターセプタークラスを設定します。

    通常の方法で KafkaProducer クラスと KafkaConsumer クラスを使用します。TracingProducerInterceptor および TracingConsumerInterceptor インターセプタークラスは、トレース機能を処理します。

    インターセプターを使用したプロデューサー設定の例

    senderProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
        TracingProducerInterceptor.class.getName());
    
    KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
    producer.send(...);
    Copy to Clipboard Toggle word wrap

    インターセプターを使用したコンシューマー設定の例

    consumerProps.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
        TracingConsumerInterceptor.class.getName());
    
    KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
    consumer.subscribe(Collections.singletonList("messages"));
    ConsumerRecords<Integer, String> records = consumer.poll(1000);
    ConsumerRecord<Integer, String> record = ...
    SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);
    Copy to Clipboard Toggle word wrap

21.3.5. Kafka Streams アプリケーションのトレース用のインストルメント化

アプリケーションコードを計測して、Kafka Streams API アプリケーションでのトレースを有効にします。デコレーターパターンまたはインターセプターを使用して、トレース用に Kafka Streams API アプリケーションをインストルメント化します。続いて、メッセージが生成されたとき、またはトピックから取得されたときにトレースを記録できます。

デコレーターのインストルメント化
デコレーターのインストルメント化は、トレース用に変更した Kafka Streams インスタンスを作成します。OpenTracing インストルメント化プロジェクトは、Kafka Streams のインストルメント化をサポートする TracingKafkaClientSupplier クラスを提供します。TracingKafkaClientSupplier サプライヤーインターフェイスのインスタンスをラップして作成し、Kafka Streams のトレースインストルメント化を行います。OpenTelemetry の場合、プロセスは同じですが、サポートを提供するためにカスタム TracingKafkaClientSupplier クラスを作成する必要があります。
インターセプターのインストルメント化
インターセプターインストルメント化の場合は、トレース機能を Kafka Streams プロデューサーおよびコンシューマー設定に追加します。

前提条件

  • クライアントのトレースを初期化 している。

    トレース JAR を依存関係としてプロジェクトに追加して、Kafka Streams アプリケーションでインストルメント化を有効にしている。

  • OpenTelemetry で Kafka Streams をインストルメント化するために、カスタムの TracingKafkaClientSupplier を記述している。
  • カスタム TracingKafkaClientSupplier が Kafka の DefaultKafkaClientSupplier を拡張し、プロデューサーとコンシューマーの作成メソッドを上書きして、インスタンスを Telemetry 関連のコードでラップできるようにしている。

    カスタム TracingKafkaClientSupplier の例

    private class TracingKafkaClientSupplier extends DefaultKafkaClientSupplier {
        @Override
        public Producer<byte[], byte[]> getProducer(Map<String, Object> config) {
            KafkaTelemetry telemetry = KafkaTelemetry.create(GlobalOpenTelemetry.get());
            return telemetry.wrap(super.getProducer(config));
        }
    
        @Override
        public Consumer<byte[], byte[]> getConsumer(Map<String, Object> config) {
            KafkaTelemetry telemetry = KafkaTelemetry.create(GlobalOpenTelemetry.get());
            return telemetry.wrap(super.getConsumer(config));
        }
    
        @Override
        public Consumer<byte[], byte[]> getRestoreConsumer(Map<String, Object> config) {
            return this.getConsumer(config);
        }
    
        @Override
        public Consumer<byte[], byte[]> getGlobalConsumer(Map<String, Object> config) {
            return this.getConsumer(config);
        }
    }
    Copy to Clipboard Toggle word wrap

手順

Kafka Streams API アプリケーションごとにこの手順を実行します。

  • デコレーターパターンを使用するには、TracingKafkaClientSupplier サプライヤーインターフェイスのインスタンスを作成し、そのサプライヤーインターフェイスを KafkaStreams に提供します。

    デコレーターのインストルメント化の例

    KafkaClientSupplier supplier = new TracingKafkaClientSupplier(tracer);
    KafkaStreams streams = new KafkaStreams(builder.build(), new StreamsConfig(config), supplier);
    streams.start();
    Copy to Clipboard Toggle word wrap

  • インターセプターを使用するには、Kafka Streams プロデューサーおよびコンシューマー設定でインターセプタークラスを設定します。

    TracingProducerInterceptor および TracingConsumerInterceptor インターセプタークラスは、トレース機能を処理します。

    インターセプターを使用したプロデューサーとコンシューマーの設定例

    props.put(StreamsConfig.PRODUCER_PREFIX + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
    props.put(StreamsConfig.CONSUMER_PREFIX + ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
    Copy to Clipboard Toggle word wrap

21.3.6. 別の OpenTelemetry トレースシステムの導入

デフォルトの OTLP システムの代わりに、OpenTelemetry でサポートされている他のトレースシステムを指定できます。これを行うには、AMQ Streams で提供される Kafka イメージに必要なアーティファクトを追加します。必要な実装固有の環境変数も設定する必要があります。次に、OTEL_TRACES_EXPORTER 環境変数を使用して、新しいトレースの実装を有効にします。

この手順では、Zipkin トレースを実装する方法を示します。

手順

  1. トレースアーティファクトを AMQ Streams Kafka イメージの /opt/kafka/libs/ ディレクトリーに追加します。

    新しいカスタムイメージを作成するための基本イメージとして、Red Hat Ecosystem Catalog の Kafka コンテナーイメージを使用できます。

    Zipkin の OpenTelemetry アーティファクト

    io.opentelemetry:opentelemetry-exporter-zipkin
    Copy to Clipboard Toggle word wrap

  2. 新しいトレース実装のトレースエクスポーターとエンドポイントを設定します。

    Zikpin トレーサーの設定例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaMirrorMaker2
    metadata:
      name: my-mm2-cluster
    spec:
      #...
      template:
        connectContainer:
          env:
            - name: OTEL_SERVICE_NAME
              value: my-zipkin-service
            - name: OTEL_EXPORTER_ZIPKIN_ENDPOINT
              value: http://zipkin-exporter-host-name:9411/api/v2/spans 
    1
    
            - name: OTEL_TRACES_EXPORTER
              value: zipkin 
    2
    
      tracing:
        type: opentelemetry
    #...
    Copy to Clipboard Toggle word wrap

    1
    接続先の Zipkin エンドポイントを指定します。
    2
    Zipkin エクスポーター。

21.3.7. カスタムスパン名

トレース スパン は Jaeger の論理作業単位で、操作名、開始時間、および期間が含まれます。スパンには組み込みの名前がありますが、使用する Kafka クライアントインストルメント化で、カスタムスパン名を指定できます。

カスタムスパン名の指定はオプションであり、プロデューサーおよびコンシューマークライアントインストルメント化 または Kafka Streams インストルメント化 でデコレーターパターンを使用する場合にのみ適用されます。

21.3.7.1. OpenTelemetry のスパン名の指定

OpenTelemetry でカスタムスパン名を直接指定できません。代わりに、コードをクライアントアプリケーションに追加してスパン名を取得し、追加のタグと属性を抽出します。

属性を抽出するコード例

//Defines attribute extraction for a producer
private static class ProducerAttribExtractor implements AttributesExtractor < ProducerRecord < ? , ? > , Void > {
    @Override
    public void onStart(AttributesBuilder attributes, ProducerRecord < ? , ? > producerRecord) {
        set(attributes, AttributeKey.stringKey("prod_start"), "prod1");
    }
    @Override
    public void onEnd(AttributesBuilder attributes, ProducerRecord < ? , ? > producerRecord, @Nullable Void unused, @Nullable Throwable error) {
        set(attributes, AttributeKey.stringKey("prod_end"), "prod2");
    }
}
//Defines attribute extraction for a consumer
private static class ConsumerAttribExtractor implements AttributesExtractor < ConsumerRecord < ? , ? > , Void > {
    @Override
    public void onStart(AttributesBuilder attributes, ConsumerRecord < ? , ? > producerRecord) {
        set(attributes, AttributeKey.stringKey("con_start"), "con1");
    }
    @Override
    public void onEnd(AttributesBuilder attributes, ConsumerRecord < ? , ? > producerRecord, @Nullable Void unused, @Nullable Throwable error) {
        set(attributes, AttributeKey.stringKey("con_end"), "con2");
    }
}
//Extracts the attributes
public static void main(String[] args) throws Exception {
        Map < String, Object > configs = new HashMap < > (Collections.singletonMap(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"));
        System.setProperty("otel.traces.exporter", "jaeger");
        System.setProperty("otel.service.name", "myapp1");
        KafkaTracing tracing = KafkaTracing.newBuilder(GlobalOpenTelemetry.get())
            .addProducerAttributesExtractors(new ProducerAttribExtractor())
            .addConsumerAttributesExtractors(new ConsumerAttribExtractor())
            .build();
Copy to Clipboard Toggle word wrap

21.3.7.2. OpenTracing のスパン名の指定

OpenTracing のカスタムスパン名を指定するには、プロデューサーとコンシューマーをインストルメント化するときに BiFunction オブジェクトを追加の引数として渡します。

組み込みの名前とカスタムスパン名を指定して、デコレーターパターンでクライアントアプリケーションコードをインストルメント化する方法の詳細は、OpenTracing Apache Kafka client instrumentation を参照してください。

トップに戻る
Red Hat logoGithubredditYoutubeTwitter

詳細情報

試用、購入および販売

コミュニティー

Red Hat ドキュメントについて

Red Hat をお使いのお客様が、信頼できるコンテンツが含まれている製品やサービスを活用することで、イノベーションを行い、目標を達成できるようにします。 最新の更新を見る.

多様性を受け入れるオープンソースの強化

Red Hat では、コード、ドキュメント、Web プロパティーにおける配慮に欠ける用語の置き換えに取り組んでいます。このような変更は、段階的に実施される予定です。詳細情報: Red Hat ブログ.

会社概要

Red Hat は、企業がコアとなるデータセンターからネットワークエッジに至るまで、各種プラットフォームや環境全体で作業を簡素化できるように、強化されたソリューションを提供しています。

Theme

© 2025 Red Hat