2.4.4. 使用 MirrorMaker 2.0 在 Kafka 集群间同步数据


使用 MirrorMaker 2.0 通过配置同步 Kafka 集群之间的数据。

配置必须指定:

  • 每个 Kafka 集群
  • 每个集群的连接信息,包括 TLS 身份验证
  • 复制流和方向

    • 集群到集群
    • 主题

使用 KafkaMirrorMaker2 资源的属性来配置 Kafka MirrorMaker 2.0 部署。

注意

以前版本的 MirrorMaker 继续受到支持。如果要使用为之前版本配置的资源,则必须将其更新为 MirrorMaker 2.0 所支持的格式。

MirrorMaker 2.0 为复制因素等属性提供默认配置值。最小配置(默认值保持不变)会类似如下:

apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaMirrorMaker2
metadata:
  name: my-mirror-maker2
spec:
  version: 2.6.0
  connectCluster: "my-cluster-target"
  clusters:
  - alias: "my-cluster-source"
    bootstrapServers: my-cluster-source-kafka-bootstrap:9092
  - alias: "my-cluster-target"
    bootstrapServers: my-cluster-target-kafka-bootstrap:9092
  mirrors:
  - sourceCluster: "my-cluster-source"
    targetCluster: "my-cluster-target"
    sourceConnector: {}

您可以使用 TLS 或 SASL 身份验证为源和目标集群配置访问控制。此流程显示为源和目标集群使用 TLS 加密和身份验证的配置。

先决条件

  • 有关运行 以下的说明,请参阅 OpenShift 指南中的部署和升级 AMQ Streams

  • 源和目标 Kafka 集群必须可用

步骤

  1. 编辑 KafkaMirrorMaker2 资源的 spec 属性。

    您可以配置的属性显示在此示例配置中:

    apiVersion: kafka.strimzi.io/v1alpha1
    kind: KafkaMirrorMaker2
    metadata:
      name: my-mirror-maker2
    spec:
      version: 2.6.0 1
      replicas: 3 2
      connectCluster: "my-cluster-target" 3
      clusters: 4
      - alias: "my-cluster-source" 5
        authentication: 6
          certificateAndKey:
            certificate: source.crt
            key: source.key
            secretName: my-user-source
          type: tls
        bootstrapServers: my-cluster-source-kafka-bootstrap:9092 7
        tls: 8
          trustedCertificates:
          - certificate: ca.crt
            secretName: my-cluster-source-cluster-ca-cert
      - alias: "my-cluster-target" 9
        authentication: 10
          certificateAndKey:
            certificate: target.crt
            key: target.key
            secretName: my-user-target
          type: tls
        bootstrapServers: my-cluster-target-kafka-bootstrap:9092 11
        config: 12
          config.storage.replication.factor: 1
          offset.storage.replication.factor: 1
          status.storage.replication.factor: 1
          ssl.cipher.suites: "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384" 13
          ssl.enabled.protocols: "TLSv1.2"
          ssl.protocol: "TLSv1.2"
          ssl.endpoint.identification.algorithm: HTTPS 14
        tls: 15
          trustedCertificates:
          - certificate: ca.crt
            secretName: my-cluster-target-cluster-ca-cert
      mirrors: 16
      - sourceCluster: "my-cluster-source" 17
        targetCluster: "my-cluster-target" 18
        sourceConnector: 19
          config:
            replication.factor: 1 20
            offset-syncs.topic.replication.factor: 1 21
            sync.topic.acls.enabled: "false" 22
            replication.policy.separator: "" 23
            replication.policy.class: "io.strimzi.kafka.connect.mirror.IdentityReplicationPolicy" 24
        heartbeatConnector: 25
          config:
            heartbeats.topic.replication.factor: 1 26
        checkpointConnector: 27
          config:
            checkpoints.topic.replication.factor: 1 28
        topicsPattern: ".*" 29
        groupsPattern: "group1|group2|group3" 30
      resources: 31
        requests:
          cpu: "1"
          memory: 2Gi
        limits:
          cpu: "2"
          memory: 2Gi
      logging: 32
        type: inline
        loggers:
          connect.root.logger.level: "INFO"
      readinessProbe: 33
        initialDelaySeconds: 15
        timeoutSeconds: 5
      livenessProbe:
        initialDelaySeconds: 15
        timeoutSeconds: 5
      jvmOptions: 34
        "-Xmx": "1g"
        "-Xms": "1g"
      image: my-org/my-image:latest 35
      template: 36
        pod:
          affinity:
            podAntiAffinity:
              requiredDuringSchedulingIgnoredDuringExecution:
                - labelSelector:
                    matchExpressions:
                      - key: application
                        operator: In
                        values:
                          - postgresql
                          - mongodb
                  topologyKey: "kubernetes.io/hostname"
        connectContainer: 37
          env:
            - name: JAEGER_SERVICE_NAME
              value: my-jaeger-service
            - name: JAEGER_AGENT_HOST
              value: jaeger-agent-name
            - name: JAEGER_AGENT_PORT
              value: "6831"
      tracing:
        type: jaeger 38
      externalConfiguration: 39
        env:
          - name: AWS_ACCESS_KEY_ID
            valueFrom:
              secretKeyRef:
                name: aws-creds
                key: awsAccessKey
          - name: AWS_SECRET_ACCESS_KEY
            valueFrom:
              secretKeyRef:
                name: aws-creds
                key: awsSecretAccessKey
    1
    Kafka Connect 版本
    2
    3
    Kafka Connect 的集群 别名.
    4
    正在同步 Kafka 集群的规格。
    5
    源 Kafka 集群的集群 别名
    6
    使用此处所示的 TLS 机制 (使用 OAuth bearer 令牌 或基于 SASL 的 SCRAM-SHA-512PLAIN 机制)对源集群进行身份验证。
    7
    8
    使用源 Kafka 集群的 TLS 证书以 X.509 格式存储的密钥名称进行 TLS 加密。如果证书存储在同一个 secret 中,则可以多次列出证书。
    9
    目标 Kafka 集群的集群 别名
    10
    目标 Kafka 集群的身份验证的配置方式与源 Kafka 集群相同。
    11
    12
    Kafka Connect 配置.标准 Apache Kafka 配置可能会提供,仅限于不是由 AMQ Streams 直接管理的属性。
    13
    使用特定 密码套件 为 TLS 版本运行外部侦听器的 SSL 属性
    14
    通过将 设置为 HTTPS启用主机名验证。空字符串将禁用验证。
    15
    目标 Kafka 集群的 TLS 加密配置方式与源 Kafka 集群相同。
    16
    17
    MirrorMaker 2.0 连接器使用的源集群的集群 别名
    18
    MirrorMaker 2.0 连接器使用的目标集群的集群 别名
    19
    创建远程主题的 MirrorSourceConnector配置配置 会覆盖默认配置选项。
    20
    在目标集群中创建的已镜像主题的复制因素。
    21
    MirrorSourceConnector offset-syncs 内部主题的复制因素,用于映射源和目标集群的偏移。
    22
    启用 ACL 规则同步 时,将应用 ACL 来同步主题。默认值是 true
    23
    定义用于重命名远程主题的分隔符。
    24
    添加可覆盖远程主题自动重命名的策略。该主题不会用源集群的名称来附加名称,而是保留其原始名称。此可选设置对主动/被动备份和数据迁移很有用。
    25
    执行连接检查 的 MirrorHeartbeatConnector配置配置 会覆盖默认配置选项。
    26
    在目标集群中创建的心跳主题的复制因素。
    27
    跟踪偏移的 MirrorCheckpointConnector配置配置 会覆盖默认配置选项。
    28
    在目标集群中创建的检查点主题的复制因素。
    29
    定义为正则表达式模式的 源集群进行主题复制。这里我们请求所有主题。
    30
    来自 定义为正则表达式模式的 源集群的使用者组复制。在这里,我们按名称请求三个消费者组。您可以使用逗号分隔的列表。
    31
    请求保留 支持的资源、当前 cpu 和 memory ,以及限制,以指定可消耗的最大资源。
    32
    指定 Kafka 连接日志记录器和日志级别 直接(内联)或通过ConfigMap 间接(外部)添加。自定义 ConfigMap 必须放在 log4j.properties 或 log4j2.properties 键下。对于 Kafka Connect log4j.rootLogger 日志记录器,您可以将日志级别设置为 INFO、ERROR、WARN、TRACE、DEBUG、FATAL 或 OFF。
    33
    健康检查以了解 何时重新启动容器(存活度)以及容器何时可以接受流量(就绪度)。
    34
    用于优化运行 Kafka MirrorMaker 的虚拟机(VM)性能的 JVM 配置选项
    35
    高级选项:容器镜像配置,建议仅在特殊情况下使用。
    36
    模板自定义.在这里,pod 被调度为反关联性,因此 pod 不会调度到具有相同主机名的节点。
    37
    38
    39
    作为环境变量挂载到 Kafka MirrorMaker 的 OpenShift Secret 的外部配置
  2. 创建或更新资源:

    oc apply -f <your-file>
Red Hat logoGithubRedditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.