2.3. Kafka MirrorMaker 集群配置


本章论述了如何在 AMQ Streams 集群中配置 Kafka 镜像Maker 部署以在 Kafka 集群间复制数据。

您可以使用带有 MirrorMaker 或 MirrorMaker 2.0 的 AMQ Streams。MirrorMaker 2.0 是最新版本,为在 Kafka 集群间镜像数据提供了一种更有效的方法。

如果使用 MirrorMaker,您需要配置 KafkaMirrorMaker 资源。

以下流程演示了如何配置资源:

KafkaMirrorMaker 资源的完整 schema 信息包括在 KafkaMirrorMaker schema 引用 中。

2.3.1. 配置 Kafka MirrorMaker

使用 KafkaMirrorMaker 资源的属性来配置 Kafka MirrorMaker 部署。

您可以使用 TLS 或 SASL 身份验证为生产者和使用者配置访问控制。此流程演示了在使用者和制作者侧使用 TLS 加密和身份验证的配置。

先决条件

  • 有关运行 以下的说明,请参阅 OpenShift 指南中的部署和升级 AMQ Streams

  • 源和目标 Kafka 集群必须可用

步骤

  1. 编辑 KafkaMirrorMaker 资源的 spec 属性。

    您可以配置的属性显示在此示例配置中:

    apiVersion: kafka.strimzi.io/v1beta1
    kind: KafkaMirrorMaker
    metadata:
      name: my-mirror-maker
    spec:
      replicas: 3 1
      consumer:
        bootstrapServers: my-source-cluster-kafka-bootstrap:9092 2
        groupId: "my-group" 3
        numStreams: 2 4
        offsetCommitInterval: 120000 5
        tls: 6
          trustedCertificates:
          - secretName: my-source-cluster-ca-cert
            certificate: ca.crt
        authentication: 7
          type: tls
          certificateAndKey:
            secretName: my-source-secret
            certificate: public.crt
            key: private.key
        config: 8
          max.poll.records: 100
          receive.buffer.bytes: 32768
          ssl.cipher.suites: "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384" 9
          ssl.enabled.protocols: "TLSv1.2"
          ssl.protocol: "TLSv1.2"
          ssl.endpoint.identification.algorithm: HTTPS 10
      producer:
        bootstrapServers: my-target-cluster-kafka-bootstrap:9092
        abortOnSendFailure: false 11
        tls:
          trustedCertificates:
          - secretName: my-target-cluster-ca-cert
            certificate: ca.crt
        authentication:
          type: tls
          certificateAndKey:
            secretName: my-target-secret
            certificate: public.crt
            key: private.key
        config:
          compression.type: gzip
          batch.size: 8192
          ssl.cipher.suites: "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384" 12
          ssl.enabled.protocols: "TLSv1.2"
          ssl.protocol: "TLSv1.2"
          ssl.endpoint.identification.algorithm: HTTPS 13
      whitelist: "my-topic|other-topic" 14
      resources: 15
        requests:
          cpu: "1"
          memory: 2Gi
        limits:
          cpu: "2"
          memory: 2Gi
      logging: 16
        type: inline
        loggers:
          mirrormaker.root.logger: "INFO"
      readinessProbe: 17
        initialDelaySeconds: 15
        timeoutSeconds: 5
      livenessProbe:
        initialDelaySeconds: 15
        timeoutSeconds: 5
      metrics: 18
        lowercaseOutputName: true
        rules:
          - pattern: "kafka.server<type=(.+), name=(.+)PerSec\\w*><>Count"
            name: "kafka_server_$1_$2_total"
          - pattern: "kafka.server<type=(.+), name=(.+)PerSec\\w*,
            topic=(.+)><>Count"
            name: "kafka_server_$1_$2_total"
            labels:
              topic: "$3"
      jvmOptions: 19
        "-Xmx": "1g"
        "-Xms": "1g"
      image: my-org/my-image:latest 20
      template: 21
        pod:
          affinity:
            podAntiAffinity:
              requiredDuringSchedulingIgnoredDuringExecution:
                - labelSelector:
                    matchExpressions:
                      - key: application
                        operator: In
                        values:
                          - postgresql
                          - mongodb
                  topologyKey: "kubernetes.io/hostname"
        connectContainer: 22
          env:
            - name: JAEGER_SERVICE_NAME
              value: my-jaeger-service
            - name: JAEGER_AGENT_HOST
              value: jaeger-agent-name
            - name: JAEGER_AGENT_PORT
              value: "6831"
      tracing: 23
        type: jaeger
    1
    2
    3
    4
    5
    6
    使用密钥名称进行 TLS 加密,其中 TLS 证书存储在 X.509 格式供使用者或生产者使用。如果证书存储在同一个 secret 中,则可以多次列出证书。
    7
    对使用者或生产者进行身份验证,使用此处所示的 TLS 机制,使用 OAuth bearer 令牌 或基于 SASL 的 SCRAM-SHA-512PLAIN 机制。
    8
    用于 使用者和 生产者 的 Kafka 配置选项。
    9
    使用特定 密码套件 为 TLS 版本运行外部侦听器的 SSL 属性
    10
    通过将 设置为 HTTPS启用主机名验证。空字符串将禁用验证。
    11
    如果 abort OnSendFailure 属性 设为 true,则 Kafka MirrorMaker 将退出,容器将在发送失败消息后重启。
    12
    使用特定 密码套件 为 TLS 版本运行外部侦听器的 SSL 属性
    13
    通过将 设置为 HTTPS启用主机名验证。空字符串将禁用验证。
    14
    从源镜像到目标 Kafka 集群 的主题白名单
    15
    请求保留 支持的资源、当前 cpu 和 memory ,以及限制,以指定可消耗的最大资源。
    16
    指定日志记录器和日志级别 通过 ConfigMap 直接(内联)或间接(外部)添加。自定义 ConfigMap 必须放在 log4j.properties 或 log4j2.properties 键下。MirrorMaker 有一个名为 mirrormaker.root.logger 的日志记录器。您可以将日志级别设置为 INFO、ERROR、WARN、TRACE、DEBUG、FATAL 或 OFF。
    17
    健康检查以了解 何时重新启动容器(存活度)以及容器何时可以接受流量(就绪度)。
    18
    Prometheus metrics,本例中通过 Prometheus JMX 导出器配置启用的 Prometheus 指标。您可以使用 metrics: {} 来在没有进一步配置的情况下启用指标。
    19
    用于优化运行 Kafka MirrorMaker 的虚拟机(VM)性能的 JVM 配置选项
    20
    高级选项:容器镜像配置,建议仅在特殊情况下使用。
    21
    模板自定义.在这里,pod 被调度为反关联性,因此 pod 不会调度到具有相同主机名的节点。
    22
    23
    警告

    将 abort OnSendFailure 属性设置为 false 时,制作者会尝试发送主题中的下一个消息。原始消息可能会丢失,因为没有尝试重新发送失败的消息。

  2. 创建或更新资源:

    oc apply -f <your-file>
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.