9.3. 设置分布式追踪
通过在自定义资源中指定追踪类型,在 Kafka 组件中启用分布式追踪。Kafka 客户端中的工具追踪器,用于端到端跟踪信息。
要设置分布式追踪,请按照以下步骤执行:
- 为 MirrorMaker、Kafka Connect 和 Kafka Bridge 启用追踪
为客户端设置追踪:
使用 tracers 检测客户端:
9.3.1. 先决条件 复制链接链接已复制到粘贴板!
在设置分布式追踪前,请确保 Jaeger 后端组件已部署到 OpenShift 集群。我们建议使用 Jaeger operator 在 OpenShift 集群上部署 Jaeger。
有关部署说明,请参阅 Jaeger 文档。
为 AMQ Streams 以外的应用程序和系统设置追踪不在此内容范围内。
MirrorMaker、MirrorMaker 2.0、Kafka Connect 和 AMQ Streams Kafka Bridge 支持分布式追踪。配置组件的自定义资源,以指定和启用 tracer 服务。
在资源中启用追踪会触发以下事件:
- 拦截器类在组件的集成使用者和生产者中更新。
- 对于 MirrorMaker、MirrorMaker 2.0 和 Kafka Connect,追踪代理根据资源中定义的追踪配置初始化 tracer。
- 对于 Kafka Bridge,基于资源中定义的追踪配置 tracer 由 Kafka Bridge 本身初始化。
您可以启用使用 OpenTelemetry 或 OpenTracing 的追踪。
MirrorMaker 和 MirrorMaker 2.0 中的追踪
对于 MirrorMaker 和 MirrorMaker 2.0,信息会根据源集群追踪到目标集群。trace 数据记录输入并离开 MirrorMaker 或 MirrorMaker 2.0 组件的消息。
Kafka Connect 中的追踪
对于 Kafka Connect,只有 Kafka Connect 生成并消耗的消息才会被追踪。要跟踪 Kafka Connect 和外部系统之间发送的消息,您必须在连接器中配置这些系统的追踪。
在 Kafka Bridge 中的追踪
对于 Kafka Bridge,Kafka Bridge 生成并消耗的消息会被追踪。另外还会跟踪来自客户端应用程序来发送和接收通过 Kafka Bridge 的信息的 HTTP 请求。要具有端到端追踪,您必须在 HTTP 客户端中配置追踪。
流程
为每个 KafkaMirrorMaker、KafkaMirrorMaker2、KafkaConnect 和 KafkaBridge 资源执行这些步骤。
在
spec.template属性中配置 tracer 服务。- 使用 追踪环境变量 作为模板配置属性。
-
对于 OpenTelemetry,将
spec.tracing.type属性设置为opentelemetry。 -
对于 OpenTracing,将
spec.tracing.type属性设置为jaeger。
使用 OpenTelemetry 的 Kafka Connect 的追踪配置示例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect-cluster spec: #... template: connectContainer: env: - name: OTEL_SERVICE_NAME value: my-otel-service - name: OTEL_EXPORTER_JAEGER_ENDPOINT value: "http://jaeger-host:14250" tracing: type: opentelemetry #...使用 OpenTelemetry 的 MirrorMaker 的追踪配置示例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaMirrorMaker metadata: name: my-mirror-maker spec: #... template: mirrorMakerContainer: env: - name: OTEL_SERVICE_NAME value: my-otel-service - name: OTEL_EXPORTER_JAEGER_ENDPOINT value: "http://jaeger-host:14250" tracing: type: opentelemetry #...使用 OpenTelemetry 的 MirrorMaker 2.0 的追踪配置示例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaMirrorMaker2 metadata: name: my-mm2-cluster spec: #... template: connectContainer: env: - name: OTEL_SERVICE_NAME value: my-otel-service - name: OTEL_EXPORTER_JAEGER_ENDPOINT value: "http://jaeger-host:14250" tracing: type: opentelemetry #...使用 OpenTelemetry 的 Kafka Bridge 的追踪配置示例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaBridge metadata: name: my-bridge spec: #... template: bridgeContainer: env: - name: OTEL_SERVICE_NAME value: my-otel-service - name: OTEL_EXPORTER_JAEGER_ENDPOINT value: "http://jaeger-host:14250" tracing: type: opentelemetry #...使用 OpenTracing 进行 Kafka Connect 的追踪配置示例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect-cluster spec: #... template: connectContainer: env: - name: JAEGER_SERVICE_NAME value: my-jaeger-service - name: JAEGER_AGENT_HOST value: jaeger-agent-name - name: JAEGER_AGENT_PORT value: "6831" tracing: type: jaeger #...使用 OpenTracing 的 MirrorMaker 的追踪配置示例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaMirrorMaker metadata: name: my-mirror-maker spec: #... template: mirrorMakerContainer: env: - name: JAEGER_SERVICE_NAME value: my-jaeger-service - name: JAEGER_AGENT_HOST value: jaeger-agent-name - name: JAEGER_AGENT_PORT value: "6831" tracing: type: jaeger #...使用 OpenTracing 的 MirrorMaker 2.0 的追踪配置示例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaMirrorMaker2 metadata: name: my-mm2-cluster spec: #... template: connectContainer: env: - name: JAEGER_SERVICE_NAME value: my-jaeger-service - name: JAEGER_AGENT_HOST value: jaeger-agent-name - name: JAEGER_AGENT_PORT value: "6831" tracing: type: jaeger #...使用 OpenTracing 的 Kafka 网桥的追踪配置示例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaBridge metadata: name: my-bridge spec: #... template: bridgeContainer: env: - name: JAEGER_SERVICE_NAME value: my-jaeger-service - name: JAEGER_AGENT_HOST value: jaeger-agent-name - name: JAEGER_AGENT_PORT value: "6831" tracing: type: jaeger #...创建或更新资源:
oc apply -f <resource_configuration_file>
9.3.3. 为 Kafka 客户端初始化追踪 复制链接链接已复制到粘贴板!
初始化 tracer,然后检测您的客户端应用程序以进行分布式追踪。您可以检测 Kafka producer 和使用者客户端,以及 Kafka Streams API 应用程序。您可以初始化 OpenTracing 或 OpenTelemetry 的 tracer。
使用一组 追踪环境变量 配置和初始化 tracer。
流程
在每个客户端应用程序中添加 tracer 的依赖项:
将 Maven 依赖项添加到客户端应用程序的
pom.xml文件中:OpenTelemetry 的依赖项
<dependency> <groupId>io.opentelemetry</groupId> <artifactId>opentelemetry-sdk-extension-autoconfigure</artifactId> <version>1.18.0-alpha</version> </dependency> <dependency> <groupId>io.opentelemetry.instrumentation</groupId> <artifactId>opentelemetry-kafka-clients-{OpenTelemetryKafkaClient}</artifactId> <version>1.18.0-alpha</version> </dependency> <dependency> <groupId>io.opentelemetry</groupId> <artifactId>opentelemetry-exporter-jaeger</artifactId> <version>1.18.0</version> </dependency>OpenTracing 的依赖项
<dependency> <groupId>io.jaegertracing</groupId> <artifactId>jaeger-client</artifactId> <version>1.3.2</version> </dependency> <dependency> <groupId>io.opentracing.contrib</groupId> <artifactId>opentracing-kafka-client</artifactId> <version>0.1.15</version> </dependency>- 使用追踪环境变量定义 tracer 的配置。
创建一个 tracer,它使用环境变量初始化:
为 OpenTelemetry 创建 tracer
OpenTelemetry ot = GlobalOpenTelemetry.get();为 OpenTracing 创建追踪器
Tracer tracer = Configuration.fromEnv().getTracer();将 tracer 注册为全局 tracer:
GlobalTracer.register(tracer);检测您的客户端:
9.3.4. 提取制作者和消费者以进行追踪 复制链接链接已复制到粘贴板!
在 Kafka 生产者和消费者中启用追踪的检测应用程序代码。使用 decorator 模式或拦截器工具您的 Java producer 和消费者应用程序代码进行追踪。然后,您可以在从主题生成或检索信息时记录跟踪。
OpenTelemetry 和 OpenTracing 检测项目提供支持生产者和消费者工具的类。
- decorator 工具
- 对于 decorator 检测,请创建修改的制作者或消费者实例以进行追踪。decorator 工具与 OpenTelemetry 和 OpenTracing 的不同。
- 拦截器工具
- 要进行拦截器检测,请将追踪功能添加到使用者或制作者配置中。拦截器工具与 OpenTelemetry 和 OpenTracing 相同。
先决条件
您已为 客户端 初始化追踪。
您可以通过在项目中添加追踪 JAR 作为依赖项来启用制作者和使用者应用。
流程
在各个制作者和消费者应用的应用代码中执行这些步骤。使用 decorator 模式或拦截器(拦截器)检测您的客户端应用程序代码。
要使用 decorator 模式,请创建修改后的制作者或消费者实例来发送或接收消息。
您传递了原始
KafkaProducer或KafkaConsumer类。OpenTelemetry 的 decorator 检测示例
// Producer instance Producer < String, String > op = new KafkaProducer < > ( configs, new StringSerializer(), new StringSerializer() ); Producer < String, String > producer = tracing.wrap(op); KafkaTracing tracing = KafkaTracing.create(GlobalOpenTelemetry.get()); producer.send(...); //consumer instance Consumer<String, String> oc = new KafkaConsumer<>( configs, new StringDeserializer(), new StringDeserializer() ); Consumer<String, String> consumer = tracing.wrap(oc); consumer.subscribe(Collections.singleton("mytopic")); ConsumerRecords<Integer, String> records = consumer.poll(1000); ConsumerRecord<Integer, String> record = ... SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);OpenTracing 的 decorator 检测示例
//producer instance KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps); TracingKafkaProducer<Integer, String> tracingProducer = new TracingKafkaProducer<>(producer, tracer); TracingKafkaProducer.send(...) //consumer instance KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps); TracingKafkaConsumer<Integer, String> tracingConsumer = new TracingKafkaConsumer<>(consumer, tracer); tracingConsumer.subscribe(Collections.singletonList("mytopic")); ConsumerRecords<Integer, String> records = tracingConsumer.poll(1000); ConsumerRecord<Integer, String> record = ... SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);要使用拦截器,请在制作者或消费者配置中设置拦截器类。
您以通常的方式使用
KafkaProducer和KafkaConsumer类。TracingProducerInterceptor和TracingConsumerInterceptorinterceptor 类负责追踪功能。使用拦截器的制作者配置示例
senderProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName()); KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps); producer.send(...);使用拦截器的使用者配置示例
consumerProps.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName()); KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps); consumer.subscribe(Collections.singletonList("messages")); ConsumerRecords<Integer, String> records = consumer.poll(1000); ConsumerRecord<Integer, String> record = ... SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);
9.3.5. 用于追踪的 Kafka Streams 应用程序 复制链接链接已复制到粘贴板!
在 Kafka Streams API 应用程序中启用追踪的检测应用程序代码。使用 decorator 模式或拦截器检测您的 Kafka Streams API 应用程序进行追踪。然后,您可以在从主题生成或检索信息时记录跟踪。
- decorator 工具
-
对于 decorator 工具,为追踪创建修改后的 Kafka Streams 实例。OpenTracing 工具项目提供
TracingKafkaClientSupplier类,它支持 Kafka 流的检测。您可以创建一个TracingKafkaClientSupplier供应商接口的嵌套实例,它为 Kafka Streams 提供追踪工具。对于 OpenTelemetry,此过程相同,但您需要创建一个自定义TracingKafkaClientSupplier类以提供支持。 - 拦截器工具
- 要进行拦截器检测,请将追踪功能添加到 Kafka Streams producer 和消费者配置中。
先决条件
您已为 客户端 初始化追踪。
您可以通过在项目中添加追踪 JAR 作为依赖项来启用 Kafka Streams 应用程序中的检测。
-
要使用 OpenTelemetry 检测 Kafka Streams,您需要编写自定义
TracingKafkaClientSupplier。 自定义
TracingKafkaClientSupplier可以扩展 Kafka 的DefaultKafkaClientSupplier,覆盖生成者和消费者创建方法,将实例嵌套与遥测相关的代码。自定义
TracingKafkaClientSupplier示例private class TracingKafkaClientSupplier extends DefaultKafkaClientSupplier { @Override public Producer<byte[], byte[]> getProducer(Map<String, Object> config) { KafkaTelemetry telemetry = KafkaTelemetry.create(GlobalOpenTelemetry.get()); return telemetry.wrap(super.getProducer(config)); } @Override public Consumer<byte[], byte[]> getConsumer(Map<String, Object> config) { KafkaTelemetry telemetry = KafkaTelemetry.create(GlobalOpenTelemetry.get()); return telemetry.wrap(super.getConsumer(config)); } @Override public Consumer<byte[], byte[]> getRestoreConsumer(Map<String, Object> config) { return this.getConsumer(config); } @Override public Consumer<byte[], byte[]> getGlobalConsumer(Map<String, Object> config) { return this.getConsumer(config); } }
流程
为每个 Kafka Streams API 应用程序执行这些步骤。
要使用 decorator 模式,请创建一个
TracingKafkaClientSupplier供应商接口实例,然后向KafkaStreams提供供应商接口。decorator 工具示例
KafkaClientSupplier supplier = new TracingKafkaClientSupplier(tracer); KafkaStreams streams = new KafkaStreams(builder.build(), new StreamsConfig(config), supplier); streams.start();要使用拦截器,请在 Kafka Streams producer 和消费者配置中设置拦截器类。
TracingProducerInterceptor和TracingConsumerInterceptorinterceptor 类负责追踪功能。使用拦截器的制作者和使用者配置示例
props.put(StreamsConfig.PRODUCER_PREFIX + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName()); props.put(StreamsConfig.CONSUMER_PREFIX + ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
9.3.6. 介绍不同的 OpenTelemetry 追踪系统 复制链接链接已复制到粘贴板!
除了默认的 Jaeger 系统,您可以指定 OpenTelemetry 支持的其他追踪系统。您可以通过在 AMQ Streams 提供的 Kafka 镜像中添加所需的工件。还必须设置任何必要的实施特定环境变量。然后,您可以使用 OTEL_TRACES_EXPORTER 环境变量启用新的追踪实施。
此流程演示了如何实施 Zipkin tracing。
流程
将追踪工件添加到 AMQ Streams Kafka 镜像的
/opt/kafka/libs/目录中。您可以使用 红帽生态系统目录 上的 Kafka 容器镜像作为创建新自定义镜像的基础镜像。
Zipkin 的 OpenTelemetry 构件
io.opentelemetry:opentelemetry-exporter-zipkin为新的追踪实施设置追踪器和端点。
Zikpin tracer 配置示例
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaMirrorMaker2 metadata: name: my-mm2-cluster spec: #... template: connectContainer: env: - name: OTEL_SERVICE_NAME value: my-zipkin-service - name: OTEL_EXPORTER_ZIPKIN_ENDPOINT value: http://zipkin-exporter-host-name:9411/api/v2/spans1 - name: OTEL_TRACES_EXPORTER value: zipkin2 tracing: type: opentelemetry #...
9.3.7. 自定义范围名称 复制链接链接已复制到粘贴板!
追踪 span 是 Jaeger 中的逻辑工作单元,包括操作名称、开始时间和持续时间。span 具有内置名称,但您可以在使用 Kafka 客户端工具中指定自定义 span 名称。
指定自定义范围名称是可选的,只有在生成者和消费者客户端检测或 Kafka Streams 检测中使用 decorator 模式时才适用。
9.3.7.1. 为 OpenTelemetry 指定 span 名称 复制链接链接已复制到粘贴板!
无法使用 OpenTelemetry 直接指定自定义 span 名称。相反,您可以通过向客户端应用程序添加代码来获取 span 名称,以提取其他标签和属性。
提取属性的代码示例
//Defines attribute extraction for a producer
private static class ProducerAttribExtractor implements AttributesExtractor < ProducerRecord < ? , ? > , Void > {
@Override
public void onStart(AttributesBuilder attributes, ProducerRecord < ? , ? > producerRecord) {
set(attributes, AttributeKey.stringKey("prod_start"), "prod1");
}
@Override
public void onEnd(AttributesBuilder attributes, ProducerRecord < ? , ? > producerRecord, @Nullable Void unused, @Nullable Throwable error) {
set(attributes, AttributeKey.stringKey("prod_end"), "prod2");
}
}
//Defines attribute extraction for a consumer
private static class ConsumerAttribExtractor implements AttributesExtractor < ConsumerRecord < ? , ? > , Void > {
@Override
public void onStart(AttributesBuilder attributes, ConsumerRecord < ? , ? > producerRecord) {
set(attributes, AttributeKey.stringKey("con_start"), "con1");
}
@Override
public void onEnd(AttributesBuilder attributes, ConsumerRecord < ? , ? > producerRecord, @Nullable Void unused, @Nullable Throwable error) {
set(attributes, AttributeKey.stringKey("con_end"), "con2");
}
}
//Extracts the attributes
public static void main(String[] args) throws Exception {
Map < String, Object > configs = new HashMap < > (Collections.singletonMap(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"));
System.setProperty("otel.traces.exporter", "jaeger");
System.setProperty("otel.service.name", "myapp1");
KafkaTracing tracing = KafkaTracing.newBuilder(GlobalOpenTelemetry.get())
.addProducerAttributesExtractors(new ProducerAttribExtractor())
.addConsumerAttributesExtractors(new ConsumerAttribExtractor())
.build();
9.3.7.2. 为 OpenTracing 指定范围名称 复制链接链接已复制到粘贴板!
要为 OpenTracing 指定自定义范围名称,请在检测生产者和消费者时将 BiFunction 对象作为参数传递。
有关内置名称和指定自定义范围名称以检测客户端应用代码的更多信息,请参阅 OpenTracing Apache Kafka 客户端检测。