17.9. 추적을 위한 Kafka Streams 애플리케이션 조정
Kafka Streams API 애플리케이션에서 추적을 활성화하는 애플리케이션 코드입니다. 데코레이터 패턴 또는 인터셉터를 사용하여 추적을 위해 Kafka Streams API 애플리케이션을 계측합니다. 그런 다음 메시지가 생성되거나 주제에서 검색될 때 추적을 기록할 수 있습니다.
- 데코레이터 계측
-
데코레이터 계측을 위해 추적을 위해 수정된 Kafka Streams 인스턴스를 생성합니다. OpenTelemetry의 경우 Kafka Streams에 추적 계측을 제공하기 위해 사용자 정의
TracingKafkaClientSupplier
클래스를 생성해야 합니다. - 인터셉터 계측
- 인터셉터 계측의 경우 Kafka Streams 생산자 및 소비자 구성에 추적 기능을 추가합니다.
사전 요구 사항
추적 JAR을 프로젝트에 종속 항목으로 추가하여 Kafka Streams 애플리케이션에서 계측을 활성화합니다.
-
OpenTelemetry를 사용하여 Kafka Streams를 조정하려면 사용자 정의
TracingKafkaClientSupplier
를 작성해야 합니다. 사용자 정의
TracingKafkaClientSupplier
는 Kafka의DefaultKafkaClientSupplier
를 확장하여 생산자 및 소비자 생성 방법을 재정의하여 Telemetry 관련 코드로 인스턴스를 래핑할 수 있습니다.사용자 정의
TracingKafkaClientSupplier
의 예private class TracingKafkaClientSupplier extends DefaultKafkaClientSupplier { @Override public Producer<byte[], byte[]> getProducer(Map<String, Object> config) { KafkaTelemetry telemetry = KafkaTelemetry.create(GlobalOpenTelemetry.get()); return telemetry.wrap(super.getProducer(config)); } @Override public Consumer<byte[], byte[]> getConsumer(Map<String, Object> config) { KafkaTelemetry telemetry = KafkaTelemetry.create(GlobalOpenTelemetry.get()); return telemetry.wrap(super.getConsumer(config)); } @Override public Consumer<byte[], byte[]> getRestoreConsumer(Map<String, Object> config) { return this.getConsumer(config); } @Override public Consumer<byte[], byte[]> getGlobalConsumer(Map<String, Object> config) { return this.getConsumer(config); } }
프로세스
각 Kafka Streams API 애플리케이션에 대해 다음 단계를 수행합니다.
데코레이터 패턴을 사용하려면
TracingKafkaClientSupplier
공급자 인터페이스 인스턴스를 생성한 다음 공급자 인터페이스를KafkaStreams
에 제공합니다.데코레이터 계측 예
KafkaClientSupplier supplier = new TracingKafkaClientSupplier(tracer); KafkaStreams streams = new KafkaStreams(builder.build(), new StreamsConfig(config), supplier); streams.start();
인터셉터를 사용하려면 Kafka Streams 생산자 및 소비자 구성에서 interceptor 클래스를 설정합니다.
TracingProducerInterceptor
및TracingConsumerInterceptor
클래스는 추적 기능을 처리합니다.인터셉터를 사용한 생산자 및 소비자 구성의 예
props.put(StreamsConfig.PRODUCER_PREFIX + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName()); props.put(StreamsConfig.CONSUMER_PREFIX + ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());