2.4.4. 使用 MirrorMaker 2.0 在 Kafka 集群间同步数据


使用 MirrorMaker 2.0 通过配置同步 Kafka 集群之间的数据。

配置必须指定:

  • 每个 Kafka 集群
  • 每个集群的连接信息,包括 TLS 身份验证
  • 复制流和方向

    • 集群到集群
    • 主题

使用 KafkaMirrorMaker2 资源的属性来配置 Kafka MirrorMaker 2.0 部署。

注意

以前版本的 MirrorMaker 继续受到支持。如果要使用为之前版本配置的资源,则必须将其更新为 MirrorMaker 2.0 所支持的格式。

MirrorMaker 2.0 为复制因素等属性提供默认配置值。最小配置(默认值保持不变)会类似如下:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
  name: my-mirror-maker2
spec:
  version: 2.8.0
  connectCluster: "my-cluster-target"
  clusters:
  - alias: "my-cluster-source"
    bootstrapServers: my-cluster-source-kafka-bootstrap:9092
  - alias: "my-cluster-target"
    bootstrapServers: my-cluster-target-kafka-bootstrap:9092
  mirrors:
  - sourceCluster: "my-cluster-source"
    targetCluster: "my-cluster-target"
    sourceConnector: {}

您可以使用 TLS 或 SASL 身份验证为源和目标集群配置访问控制。此流程显示为源和目标集群使用 TLS 加密和身份验证的配置。

先决条件

  • 有关运行 以下的说明,请参阅 OpenShift 指南中的部署和升级 AMQ Streams

  • 源和目标 Kafka 集群必须可用

流程

  1. 编辑 KafkaMirrorMaker2 资源的 spec 属性。

    您可以配置的属性显示在此示例配置中:

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaMirrorMaker2
    metadata:
      name: my-mirror-maker2
    spec:
      version: 2.8.0 1
      replicas: 3 2
      connectCluster: "my-cluster-target" 3
      clusters: 4
      - alias: "my-cluster-source" 5
        authentication: 6
          certificateAndKey:
            certificate: source.crt
            key: source.key
            secretName: my-user-source
          type: tls
        bootstrapServers: my-cluster-source-kafka-bootstrap:9092 7
        tls: 8
          trustedCertificates:
          - certificate: ca.crt
            secretName: my-cluster-source-cluster-ca-cert
      - alias: "my-cluster-target" 9
        authentication: 10
          certificateAndKey:
            certificate: target.crt
            key: target.key
            secretName: my-user-target
          type: tls
        bootstrapServers: my-cluster-target-kafka-bootstrap:9092 11
        config: 12
          config.storage.replication.factor: 1
          offset.storage.replication.factor: 1
          status.storage.replication.factor: 1
          ssl.cipher.suites: "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384" 13
          ssl.enabled.protocols: "TLSv1.2"
          ssl.protocol: "TLSv1.2"
          ssl.endpoint.identification.algorithm: HTTPS 14
        tls: 15
          trustedCertificates:
          - certificate: ca.crt
            secretName: my-cluster-target-cluster-ca-cert
      mirrors: 16
      - sourceCluster: "my-cluster-source" 17
        targetCluster: "my-cluster-target" 18
        sourceConnector: 19
          tasksMax: 10 20
          config:
            replication.factor: 1 21
            offset-syncs.topic.replication.factor: 1 22
            sync.topic.acls.enabled: "false" 23
            refresh.topics.interval.seconds: 60 24
            replication.policy.separator: "" 25
            replication.policy.class: "io.strimzi.kafka.connect.mirror.IdentityReplicationPolicy" 26
        heartbeatConnector: 27
          config:
            heartbeats.topic.replication.factor: 1 28
        checkpointConnector: 29
          config:
            checkpoints.topic.replication.factor: 1 30
            refresh.groups.interval.seconds: 600 31
            sync.group.offsets.enabled: true 32
            sync.group.offsets.interval.seconds: 60 33
            emit.checkpoints.interval.seconds: 60 34
            replication.policy.class: "io.strimzi.kafka.connect.mirror.IdentityReplicationPolicy"
        topicsPattern: ".*" 35
        groupsPattern: "group1|group2|group3" 36
      resources: 37
        requests:
          cpu: "1"
          memory: 2Gi
        limits:
          cpu: "2"
          memory: 2Gi
      logging: 38
        type: inline
        loggers:
          connect.root.logger.level: "INFO"
      readinessProbe: 39
        initialDelaySeconds: 15
        timeoutSeconds: 5
      livenessProbe:
        initialDelaySeconds: 15
        timeoutSeconds: 5
      jvmOptions: 40
        "-Xmx": "1g"
        "-Xms": "1g"
      image: my-org/my-image:latest 41
      template: 42
        pod:
          affinity:
            podAntiAffinity:
              requiredDuringSchedulingIgnoredDuringExecution:
                - labelSelector:
                    matchExpressions:
                      - key: application
                        operator: In
                        values:
                          - postgresql
                          - mongodb
                  topologyKey: "kubernetes.io/hostname"
        connectContainer: 43
          env:
            - name: JAEGER_SERVICE_NAME
              value: my-jaeger-service
            - name: JAEGER_AGENT_HOST
              value: jaeger-agent-name
            - name: JAEGER_AGENT_PORT
              value: "6831"
      tracing:
        type: jaeger 44
      externalConfiguration: 45
        env:
          - name: AWS_ACCESS_KEY_ID
            valueFrom:
              secretKeyRef:
                name: aws-creds
                key: awsAccessKey
          - name: AWS_SECRET_ACCESS_KEY
            valueFrom:
              secretKeyRef:
                name: aws-creds
                key: awsSecretAccessKey
    1
    Kafka Connect 和 Mirror Maker 2.0 版本,版本 将始终相同。
    2
    3
    Kafka Connect 的 Kafka 集群别名,它必须指定 目标 Kafka 集群。Kafka 集群由 Kafka Connect 用于其内部主题。
    4
    正在同步 Kafka 集群的规格。
    5
    源 Kafka 集群的集群 别名
    6
    使用此处所示的 TLS 机制 (使用 OAuth bearer 令牌 或基于 SASL 的 SCRAM-SHA-512PLAIN 机制)对源集群进行身份验证。
    7
    8
    使用源 Kafka 集群的 TLS 证书以 X.509 格式存储的密钥名称进行 TLS 加密。如果证书存储在同一个 secret 中,则可以多次列出证书。
    9
    目标 Kafka 集群的集群 别名
    10
    目标 Kafka 集群的身份验证的配置方式与源 Kafka 集群相同。
    11
    12
    Kafka Connect 配置.标准 Apache Kafka 配置可能会提供,仅限于不是由 AMQ Streams 直接管理的属性。
    13
    使用特定 密码套件 为 TLS 版本运行外部侦听器的 SSL 属性
    14
    通过将 设置为 HTTPS启用主机名验证。空字符串将禁用验证。
    15
    目标 Kafka 集群的 TLS 加密配置方式与源 Kafka 集群相同。
    16
    17
    MirrorMaker 2.0 连接器使用的源集群的集群 别名
    18
    MirrorMaker 2.0 连接器使用的目标集群的集群 别名
    19
    创建远程主题的 MirrorSourceConnector配置配置 会覆盖默认配置选项。
    20
    连接器可以创建的任务数量上限。任务处理数据复制,并并行运行。如果基础架构支持处理开销,增加这个值可以提高吞吐量。Kafka Connect 在集群成员之间分发任务。如果任务数量超过工作程序,则为工作程序分配多项任务。对于接收器连接器,旨在为每个主题分区使用一个任务。对于源连接器,可以并行运行的任务数量也可能取决于外部系统。如果无法实现并行性,连接器会创建少于任务的最大数量。
    21
    在目标集群中创建的已镜像主题的复制因素。
    22
    MirrorSourceConnector offset-syncs 内部主题的复制因素,用于映射源和目标集群的偏移。
    23
    启用 ACL 规则同步 时,将应用 ACL 来同步主题。默认值为 true
    24
    可选设置,可更改新主题的检查频率。默认为每 10 分钟检查一次。
    25
    定义用于重命名远程主题的分隔符。
    26
    添加可覆盖远程主题自动重命名的策略。该主题不会用源集群的名称来附加名称,而是保留其原始名称。此可选设置对主动/被动备份和数据迁移很有用。要配置主题偏移同步,还必须为 checkpointConnector.config 设置此属性。
    27
    执行连接检查 的 MirrorHeartbeatConnector配置配置 会覆盖默认配置选项。
    28
    在目标集群中创建的心跳主题的复制因素。
    29
    跟踪偏移的 MirrorCheckpointConnector配置配置 会覆盖默认配置选项。
    30
    在目标集群中创建的检查点主题的复制因素。
    31
    可选设置,以更改新使用者组的检查频率。默认为每 10 分钟检查一次。
    32
    可选设置来同步消费者组偏移,这对于主动/被动配置中的恢复非常有用。默认情况下不启用同步。
    33
    如果启用了消费者组偏移的同步,您可以调整同步的频率。
    34
    调整检查偏移跟踪的频率。如果更改了偏移同步的频率,您可能还需要调整这些检查的频率。
    35
    定义为正则表达式模式的 源集群进行主题复制。这里我们请求所有主题。
    36
    来自 定义为正则表达式模式的 源集群的使用者组复制。在这里,我们按名称请求三个消费者组。您可以使用逗号分隔的列表。
    37
    请求保留 支持的资源、当前 cpu 和 memory ,以及限制,以指定可消耗的最大资源。
    38
    指定 Kafka 连接日志记录器和日志级别 直接(内联)或通过ConfigMap 间接(外部)添加。自定义 ConfigMap 必须放在 log4j.properties 或 log4j2.properties 键下。对于 Kafka Connect log4j.rootLogger 日志记录器,您可以将日志级别设置为 INFO、ERROR、WARN、TRACE、DEBUG、FATAL 或 OFF。
    39
    健康检查以了解 何时重新启动容器(存活度)以及容器何时可以接受流量(就绪度)。
    40
    用于优化运行 Kafka MirrorMaker 的虚拟机(VM)性能的 JVM 配置选项
    41
    ADVANCED OPTION: 容器镜像配置,只在特殊情况下推荐这样做。
    42
    模板自定义.在这里,pod 被调度为反关联性,因此 pod 不会调度到具有相同主机名的节点。
    43
    44
    45
    作为环境变量挂载到 Kafka MirrorMaker 的 OpenShift Secret 的外部配置。您还可以使用 Kubernetes 配置提供程序 从外部来源加载配置值
  2. 创建或更新资源:

    oc apply -f MIRRORMAKER-CONFIGURATION-FILE
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.