9.3. 추적기를 사용하여 Kafka 클라이언트 조정


분산 추적을 위해 Kafka 생산자 및 소비자 클라이언트 및 Kafka Streams API 애플리케이션입니다.

9.3.1. 추적을 위한 생산자 및 소비자 처리

Decorator 패턴 또는 인터셉터를 사용하여 추적을 위해 Java 생산자 및 소비자 애플리케이션 코드를 계측합니다.

절차

각 생산자 및 소비자 애플리케이션의 애플리케이션 코드에서 다음을 수행합니다.

  1. OpenTracing의 Maven 종속성을 생산자 또는 소비자의 pom.xml 파일에 추가합니다.

    <dependency>
        <groupId>io.opentracing.contrib</groupId>
        <artifactId>opentracing-kafka-client</artifactId>
        <version>0.1.15.redhat-00006</version>
    </dependency>
    Copy to Clipboard Toggle word wrap
  2. Decorator 패턴 또는 인터셉터를 사용하여 클라이언트 애플리케이션 코드를 계측합니다.

    • Decorator 패턴을 사용하려면 다음을 수행합니다.

      // Create an instance of the KafkaProducer:
      KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
      
      // Create an instance of the TracingKafkaProducer:
      TracingKafkaProducer<Integer, String> tracingProducer = new TracingKafkaProducer<>(producer,
              tracer);
      
      // Send:
      tracingProducer.send(...);
      
      // Create an instance of the KafkaConsumer:
      KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
      
      // Create an instance of the TracingKafkaConsumer:
      TracingKafkaConsumer<Integer, String> tracingConsumer = new TracingKafkaConsumer<>(consumer,
              tracer);
      
      // Subscribe:
      tracingConsumer.subscribe(Collections.singletonList("messages"));
      
      // Get messages:
      ConsumerRecords<Integer, String> records = tracingConsumer.poll(1000);
      
      // Retrieve SpanContext from polled record (consumer side):
      ConsumerRecord<Integer, String> record = ...
      SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);
      Copy to Clipboard Toggle word wrap
    • 인터셉터를 사용하려면 다음을 수행합니다.

      // Register the tracer with GlobalTracer:
      GlobalTracer.register(tracer);
      
      // Add the TracingProducerInterceptor to the sender properties:
      senderProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
                TracingProducerInterceptor.class.getName());
      
      // Create an instance of the KafkaProducer:
      KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
      
      // Send:
      producer.send(...);
      
      // Add the TracingConsumerInterceptor to the consumer properties:
      consumerProps.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
                TracingConsumerInterceptor.class.getName());
      
      // Create an instance of the KafkaConsumer:
      KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
      
      // Subscribe:
      consumer.subscribe(Collections.singletonList("messages"));
      
      // Get messages:
      ConsumerRecords<Integer, String> records = consumer.poll(1000);
      
      // Retrieve the SpanContext from a polled message (consumer side):
      ConsumerRecord<Integer, String> record = ...
      SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);
      Copy to Clipboard Toggle word wrap

9.3.1.1. Decorator 패턴의 사용자 정의 범위 이름

기간은 작업 이름, 시작 시간 및 기간이 있는 Jaeger의 논리적 작업 단위입니다.

생산자 및 소비자 애플리케이션을 계측하는 데 Decorator 패턴을 사용하려면 TracingKafkaProducerTracingKafkaConsumer 오브젝트를 생성할 때 BiFunction 오브젝트를 추가 인수로 전달하여 사용자 정의 범위 이름을 정의합니다. OpenTracing Apache Kafka Client Instrumentation 라이브러리에는 여러 개의 기본 제공 범위 이름이 포함되어 있습니다.

예: 사용자 정의 범위 이름을 사용하여 Decorator 패턴에서 클라이언트 애플리케이션 코드를 조정

// Create a BiFunction for the KafkaProducer that operates on (String operationName, ProducerRecord consumerRecord) and returns a String to be used as the name:

BiFunction<String, ProducerRecord, String> producerSpanNameProvider =
    (operationName, producerRecord) -> "CUSTOM_PRODUCER_NAME";

// Create an instance of the KafkaProducer:
KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);

// Create an instance of the TracingKafkaProducer
TracingKafkaProducer<Integer, String> tracingProducer = new TracingKafkaProducer<>(producer,
        tracer,
        producerSpanNameProvider);

// Spans created by the tracingProducer will now have "CUSTOM_PRODUCER_NAME" as the span name.

// Create a BiFunction for the KafkaConsumer that operates on (String operationName, ConsumerRecord consumerRecord) and returns a String to be used as the name:

BiFunction<String, ConsumerRecord, String> consumerSpanNameProvider =
    (operationName, consumerRecord) -> operationName.toUpperCase();

// Create an instance of the KafkaConsumer:
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);

// Create an instance of the TracingKafkaConsumer, passing in the consumerSpanNameProvider BiFunction:

TracingKafkaConsumer<Integer, String> tracingConsumer = new TracingKafkaConsumer<>(consumer,
        tracer,
        consumerSpanNameProvider);

// Spans created by the tracingConsumer will have the operation name as the span name, in upper-case.
// "receive" -> "RECEIVE"
Copy to Clipboard Toggle word wrap

9.3.1.2. 기본 제공 범위 이름

사용자 지정 범위 이름을 정의할 때 ClientSpanNameProvider 클래스에서 다음 BiFunctions 를 사용할 수 있습니다. spanNameProvider 를 지정하지 않으면 CONSUMER_OPERATION_NAMEPRODUCER_OPERATION_NAME 이 사용됩니다.

Expand
BiFunction설명

CONSUMER_OPERATION_NAME, PRODUCER_OPERATION_NAME

user Name을 범위의 이름으로 반환합니다. 소비자의 경우 "receive" 및 생산자의 경우 "send"를 반환합니다.

CONSUMER_PREFIXED_OPERATION_NAME(String prefix), PRODUCER_PREFIXED_OPERATION_NAME(String prefix)

접두사operationName 의 문자열 연결을 반환합니다.

CONSUMER_TOPIC, PRODUCER_TOPIC

메시지가 전송된 대상의 이름 또는 형식 (record.topic()) 을 반환합니다.

PREFIXED_CONSUMER_TOPIC(String prefix), PREFIXED_PRODUCER_TOPIC(String prefix)

문자열 접두사 연결과 항목 이름 (record.topic()) 을 반환합니다.

CONSUMER_OPERATION_NAME_TOPIC, PRODUCER_OPERATION_NAME_TOPIC

작업 이름과 주제 이름을 반환합니다. "operationName - record.topic()".

CONSUMER_PREFIXED_OPERATION_NAME_TOPIC(String prefix), PRODUCER_PREFIXED_OPERATION_NAME_TOPIC(String prefix)

접두사"operationName - record.topic()" 문자열 연결을 반환합니다.

9.3.2. 추적을 위한 Kafka Streams 애플리케이션 조정

이 섹션에서는 분산 추적을 위해 Kafka Streams API 애플리케이션을 조정하는 방법을 설명합니다.

절차

각 Kafka Streams API 애플리케이션에서 다음을 수행합니다.

  1. Kafka Streams API 애플리케이션의 pom.xml 파일에 opentracing-kafka-streams 종속성을 추가합니다.

    <dependency>
        <groupId>io.opentracing.contrib</groupId>
        <artifactId>opentracing-kafka-streams</artifactId>
        <version>0.1.15.redhat-00006</version>
    </dependency>
    Copy to Clipboard Toggle word wrap
  2. TracingKafkaClientSupplier 공급자 인터페이스 인스턴스를 생성합니다.

    KafkaClientSupplier supplier = new TracingKafkaClientSupplier(tracer);
    Copy to Clipboard Toggle word wrap
  3. KafkaStreams 에 공급자 인터페이스를 제공합니다.

    KafkaStreams streams = new KafkaStreams(builder.build(), new StreamsConfig(config), supplier);
    streams.start();
    Copy to Clipboard Toggle word wrap
Red Hat logoGithubredditYoutubeTwitter

자세한 정보

평가판, 구매 및 판매

커뮤니티

Red Hat 문서 정보

Red Hat을 사용하는 고객은 신뢰할 수 있는 콘텐츠가 포함된 제품과 서비스를 통해 혁신하고 목표를 달성할 수 있습니다. 최신 업데이트를 확인하세요.

보다 포괄적 수용을 위한 오픈 소스 용어 교체

Red Hat은 코드, 문서, 웹 속성에서 문제가 있는 언어를 교체하기 위해 최선을 다하고 있습니다. 자세한 내용은 다음을 참조하세요.Red Hat 블로그.

Red Hat 소개

Red Hat은 기업이 핵심 데이터 센터에서 네트워크 에지에 이르기까지 플랫폼과 환경 전반에서 더 쉽게 작업할 수 있도록 강화된 솔루션을 제공합니다.

Theme

© 2026 Red Hat
맨 위로 이동