2.3. Kafka MirrorMaker 集群配置
本章论述了如何在 AMQ Streams 集群中配置 Kafka 镜像Maker 部署以在 Kafka 集群间复制数据。
您可以使用带有 MirrorMaker 或 MirrorMaker 2.0 的 AMQ Streams。MirrorMaker 2.0 是最新版本,为在 Kafka 集群间镜像数据提供了一种更有效的方法。
如果使用 MirrorMaker,您需要配置 KafkaMirrorMaker
资源。
以下流程演示了如何配置资源:
KafkaMirrorMaker
资源的完整 schema 信息包括在 KafkaMirrorMaker schema 引用 中。
2.3.1. 配置 Kafka MirrorMaker
使用 KafkaMirrorMaker
资源的属性来配置 Kafka MirrorMaker 部署。
您可以使用 TLS 或 SASL 身份验证为生产者和使用者配置访问控制。此流程演示了在使用者和制作者侧使用 TLS 加密和身份验证的配置。
先决条件
有关运行 以下的说明,请参阅 OpenShift 指南中的部署和升级 AMQ Streams :
- 源和目标 Kafka 集群必须可用
步骤
编辑
KafkaMirrorMaker
资源的spec
属性。您可以配置的属性显示在此示例配置中:
apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaMirrorMaker metadata: name: my-mirror-maker spec: replicas: 3 1 consumer: bootstrapServers: my-source-cluster-kafka-bootstrap:9092 2 groupId: "my-group" 3 numStreams: 2 4 offsetCommitInterval: 120000 5 tls: 6 trustedCertificates: - secretName: my-source-cluster-ca-cert certificate: ca.crt authentication: 7 type: tls certificateAndKey: secretName: my-source-secret certificate: public.crt key: private.key config: 8 max.poll.records: 100 receive.buffer.bytes: 32768 ssl.cipher.suites: "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384" 9 ssl.enabled.protocols: "TLSv1.2" ssl.protocol: "TLSv1.2" ssl.endpoint.identification.algorithm: HTTPS 10 producer: bootstrapServers: my-target-cluster-kafka-bootstrap:9092 abortOnSendFailure: false 11 tls: trustedCertificates: - secretName: my-target-cluster-ca-cert certificate: ca.crt authentication: type: tls certificateAndKey: secretName: my-target-secret certificate: public.crt key: private.key config: compression.type: gzip batch.size: 8192 ssl.cipher.suites: "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384" 12 ssl.enabled.protocols: "TLSv1.2" ssl.protocol: "TLSv1.2" ssl.endpoint.identification.algorithm: HTTPS 13 whitelist: "my-topic|other-topic" 14 resources: 15 requests: cpu: "1" memory: 2Gi limits: cpu: "2" memory: 2Gi logging: 16 type: inline loggers: mirrormaker.root.logger: "INFO" readinessProbe: 17 initialDelaySeconds: 15 timeoutSeconds: 5 livenessProbe: initialDelaySeconds: 15 timeoutSeconds: 5 metrics: 18 lowercaseOutputName: true rules: - pattern: "kafka.server<type=(.+), name=(.+)PerSec\\w*><>Count" name: "kafka_server_$1_$2_total" - pattern: "kafka.server<type=(.+), name=(.+)PerSec\\w*, topic=(.+)><>Count" name: "kafka_server_$1_$2_total" labels: topic: "$3" jvmOptions: 19 "-Xmx": "1g" "-Xms": "1g" image: my-org/my-image:latest 20 template: 21 pod: affinity: podAntiAffinity: requiredDuringSchedulingIgnoredDuringExecution: - labelSelector: matchExpressions: - key: application operator: In values: - postgresql - mongodb topologyKey: "kubernetes.io/hostname" connectContainer: 22 env: - name: JAEGER_SERVICE_NAME value: my-jaeger-service - name: JAEGER_AGENT_HOST value: jaeger-agent-name - name: JAEGER_AGENT_PORT value: "6831" tracing: 23 type: jaeger
- 1
- 2
- 为 使用者和生产者引导服务器.
- 3
- 4
- 5
- 6
- 使用密钥名称进行 TLS 加密,其中 TLS 证书存储在 X.509 格式供使用者或生产者使用。如果证书存储在同一个 secret 中,则可以多次列出证书。
- 7
- 8
- 9
- 使用特定 密码套件 为 TLS 版本运行外部侦听器的 SSL 属性。
- 10
- 通过将 设置为
HTTPS
来 启用主机名验证。空字符串将禁用验证。 - 11
- 如果 abort
OnSendFailure
属性 设为true
,则 Kafka MirrorMaker 将退出,容器将在发送失败消息后重启。 - 12
- 使用特定 密码套件 为 TLS 版本运行外部侦听器的 SSL 属性。
- 13
- 通过将 设置为
HTTPS
来 启用主机名验证。空字符串将禁用验证。 - 14
- 从源镜像到目标 Kafka 集群 的主题白名单 。
- 15
- 16
- 指定日志记录器和日志级别 通过 ConfigMap 直接(
内联
)或间接(外部
)添加。自定义 ConfigMap 必须放在log4j.properties 或
log4j2.properties
键下。MirrorMaker 有一个名为mirrormaker.root.logger
的日志记录器。您可以将日志级别设置为 INFO、ERROR、WARN、TRACE、DEBUG、FATAL 或 OFF。 - 17
- 健康检查以了解 何时重新启动容器(存活度)以及容器何时可以接受流量(就绪度)。
- 18
- Prometheus metrics,本例中通过 Prometheus JMX 导出器配置启用的 Prometheus 指标。您可以使用
metrics: {}
来在没有进一步配置的情况下启用指标。 - 19
- 用于优化运行 Kafka MirrorMaker 的虚拟机(VM)性能的 JVM 配置选项。
- 20
- 高级选项:容器镜像配置,建议仅在特殊情况下使用。
- 21
- 模板自定义.在这里,pod 被调度为反关联性,因此 pod 不会调度到具有相同主机名的节点。
- 22
- 还 使用 Jaeger 为分布式追踪设置 环境变量。
- 23
警告将 abort
OnSendFailure
属性设置为false
时,制作者会尝试发送主题中的下一个消息。原始消息可能会丢失,因为没有尝试重新发送失败的消息。创建或更新资源:
oc apply -f <your-file>