2.4. Kafka MirrorMaker 2 集群配置


使用 KafkaMirrorMaker2 资源配置 Kafka MirrorMaker 2 部署。MirrorMaker 2 在两个或多个 Kafka 集群之间复制数据,并在数据中心之间复制数据。

第 6.2.128 节 “KafkaMirrorMaker2 模式参考” 描述 KafkaMirrorMaker2 资源的完整模式。

MirrorMaker 2 资源配置与之前的 MirrorMaker 版本不同。如果您选择使用 MirrorMaker 2,目前没有传统支持,因此任何资源都必须手动转换为新格式。

2.4.1. MirrorMaker 2 数据复制

跨集群的数据复制支持需要以下情况:

  • 在系统失败时恢复数据
  • 用于分析的数据聚合
  • 对特定集群的数据访问限制
  • 在特定位置置备数据以改进延迟

2.4.1.1. MirrorMaker 2 配置

MirrorMaker 2 使用来自源 Kafka 集群的信息,并将其写入目标 Kafka 集群。

MirrorMaker 2 使用:

  • 源集群配置使用来自源集群的数据
  • 将数据输出到目标集群的目标集群配置

MirrorMaker 2 基于 Kafka Connect 框架,连接器 管理集群之间的数据传输。

您可以配置 MirrorMaker 2 以定义 Kafka Connect 部署,包括源和目标集群的连接详情,然后运行一组 MirrorMaker 2 连接器来进行连接。

MirrorMaker 2 由以下连接器组成:

MirrorSourceConnector
源连接器将主题从源集群复制到目标集群。它还复制 ACL,且是 MirrorCheckpointConnector 才能运行所必需的。
MirrorCheckpointConnector
checkpoint 连接器定期跟踪偏移。如果启用,它还会在源和目标集群之间同步消费者组偏移。
MirrorHeartbeatConnector
heartbeat 连接器会定期检查源和目标集群之间的连接。
注意

如果您使用 User Operator 管理 ACL,则无法通过连接器进行 ACL 复制。

将数据从源集群镜像 (mirror)到目标集群的过程是异步的。每个 MirrorMaker 2 实例将数据从一个源集群镜像到一个目标集群。您可以使用多个 MirrorMaker 2 实例在任意数量的集群间镜像数据。

图 2.1. 在两个集群间复制

MirrorMaker 2 复制

默认情况下,检查源集群中的新主题每 10 分钟进行。您可以通过在源连接器配置中添加 refresh.topics.interval.seconds 来更改频率。

2.4.1.1.1. 集群配置

您可以在 主动/被动或主动/主动 集群配置中 使用 MirrorMaker 2。

主动/主动集群配置
主动/主动配置有两个活跃集群来复制数据。应用程序可以使用任一集群。每个集群都可以提供相同的数据。这样,您可以在不同的地理位置提供相同的数据。因为消费者组在两个集群中都活跃,复制主题的使用者偏移不会重新同步到源集群。
主动/被动集群配置
主动/被动配置具有主动集群,将数据复制到被动集群。被动集群保持待机状态。在系统失败时,您可以使用被动集群进行数据恢复。

预期的结构是,生成者和消费者仅连接到活跃集群。每个目标目的地都需要一个 MirrorMaker 2 集群。

2.4.1.1.2. 双向复制(active/active)

MirrorMaker 2 架构支持 主动/主动 集群配置中的双向复制。

每个集群使用 sourceremote 主题的概念复制其他集群的数据。由于同一主题存储在每个集群中,所以远程主题由 MirrorMaker 2 重命名,以表示源集群。原始集群的名称前面是主题名称的前面。

图 2.2. 主题重命名

MirrorMaker 2 双向架构

通过标记原始集群,主题不会复制到该集群。

在配置需要数据聚合的架构时,通过 远程主题 复制的概念非常有用。消费者可以订阅同一集群中的源和远程主题,而无需单独的聚合集群。

2.4.1.1.3. 单向复制(active/passive)

MirrorMaker 2 架构支持 主动/被动 集群配置中的单向复制。

您可以使用 主动/被动集群 配置进行备份或将数据迁移到另一个集群。在这种情况下,您可能不希望自动重命名远程主题。

您可以通过将 IdentityReplicationPolicy 添加到源连接器配置来覆盖自动重命名。应用此配置后,主题会保留其原始名称。

2.4.1.2. 主题配置同步

MirrorMaker 2 支持源和目标集群之间的主题配置同步。您可以在 MirrorMaker 2 配置中指定源主题。MirrorMaker 2 监控源主题。MirrorMaker 2 会检测并将更改传播到源主题到远程主题。更改可能包括自动创建缺少的主题和分区。

注意

在大多数情况下,您写入本地主题并从远程主题读取。虽然对于远程主题不会阻止写操作,但应该避免它们。

2.4.1.3. 偏移跟踪

MirrorMaker 2 使用内部主题跟踪消费者组的偏移。

offset-syncs 主题
offset-syncs 主题映射复制主题元数据的源和目标偏移。
checkpoints topic
checkpoints 主题映射源和目标集群中每个消费者组中复制的主题分区的最后提交偏移量。

与 MirrorMaker 2 内部使用的一样,您不直接与这些主题交互。

MirrorCheckpointConnector 发出偏移跟踪的 检查点checkpoints 主题的偏移通过配置预先确定的间隔进行跟踪。两个主题都允许从故障转移上的正确偏移位置完全恢复复制。

offset-syncs 主题的默认位置是 源集群。您可以使用 offset-syncs.topic.location 连接器配置将其更改为 目标集群。您需要对包含该主题的集群进行读/写访问。使用目标集群作为 offset-syncs 主题的位置,即使您只对源集群具有读取访问权限,您也可以使用 MirrorMaker 2。

2.4.1.4. 同步消费者组偏移

__consumer_offsets 主题存储各个消费者组的提交偏移信息。偏移同步会定期将源集群的消费者组的使用者偏移转移到目标集群的使用者偏移量中。

偏移同步在主动/被动配置中特别有用。如果活跃集群停机,消费者应用程序可以切换到被动(standby)集群,并从最后一个传输的偏移位置获取。

要使用主题偏移同步,请通过将 sync.group.offsets.enabled 添加到检查点连接器配置来启用同步,并将属性设置为 true。默认情况下禁用同步。

在源连接器中使用 IdentityReplicationPolicy 时,还必须在检查点连接器配置中配置它。这样可确保为正确的主题应用镜像的使用者偏移。

消费者偏移只针对在目标集群中没有活跃的消费者组同步。如果消费者组位于目标集群,则无法执行同步,并返回 UNKNOWN_MEMBER_ID 错误。

如果启用,则定期从源集群进行偏移同步。您可以通过在检查点连接器配置中添加 sync.group.offsets.interval.secondsemit.checkpoints.interval.seconds 来更改频率。属性指定同步消费者组偏移的频率,以及为偏移跟踪发送检查点的频率。这两个属性的默认值为 60 秒。您还可以使用 refresh.groups.interval.seconds 属性更改检查新消费者组的频率,该属性默认每 10 分钟执行。

由于同步是基于时间的,因此消费者向被动集群切换的任何用户都会造成一些消息重复。

注意

如果您使用 Java 编写的应用,您可以使用 RemoteClusterUtils.java 实用程序通过应用同步偏移。实用程序从 checkpoints 主题获取消费者组的远程偏移。

2.4.1.5. 连接检查

MirrorHeartbeatConnector 发出 心跳 来检查集群之间的连接。

内部 心跳 主题从源集群复制。目标集群使用 heartbeat 主题检查以下内容:

  • 连接器管理集群间的连接正在运行
  • 源集群可用

2.4.2. 连接器配置

为内部连接器使用 Mirrormaker 2 连接器配置,用于编配 Kafka 集群之间的数据同步。

下表描述了连接器属性以及您配置的连接器属性。

Expand
表 2.1. MirrorMaker 2 连接器配置属性
属性sourceConnectorcheckpointConnectorheartbeatConnector
admin.timeout.ms
管理任务的超时,如检测新主题。默认为 60000 (1 分钟)。

replication.policy.class
定义远程主题命名约定的策略。默认为 org.apache.kafka.connect.mirror.DefaultReplicationPolicy

replication.policy.separator
在目标集群中用于主题命名的分隔符。默认为 . (点)。

consumer.poll.timeout.ms
轮询源集群时超时。默认值为 1000 (1 秒)。

 
offset-syncs.topic.location
offset-syncs 主题的位置,可以是 (默认)或 目标集群

 
topic.filter.class
主题过滤器,以选择要复制的主题。默认为 org.apache.kafka.connect.mirror.DefaultTopicFilter

 
config.property.filter.class
主题过滤器,以选择要复制的主题配置属性。默认为 org.apache.kafka.connect.mirror.DefaultConfigPropertyFilter

  
config.properties.exclude
不应当复制的主题配置属性。支持以逗号分隔的属性名称和正则表达式。

  
offset.lag.max
同步远程分区前允许的最大(out-of-sync)偏移。默认值为 100

  
offset-syncs.topic.replication.factor
内部偏移同步 主题的复制因素。默认为 3

  
refresh.topics.enabled
启用检查新主题和分区。默认为 true

  
refresh.topics.interval.seconds
主题刷新的频率。默认为 600 (10 分钟)。

  
replication.factor
新主题的复制因素。默认为 2

  
sync.topic.acls.enabled
启用与源集群的 ACL 同步。默认为 true。与 User Operator 不兼容。

  
sync.topic.acls.interval.seconds
ACL 同步频率。默认为 600 (10 分钟)。

  
sync.topic.configs.enabled
启用从源集群同步主题配置。默认为 true

  
sync.topic.configs.interval.seconds
主题配置同步的频率。默认 600 (10 分钟)。

  
checkpoints.topic.replication.factor
内部 检查点 主题的复制因素。默认为 3
 

 
emit.checkpoints.enabled
启用将消费者偏移同步到目标集群。默认为 true
 

 
emit.checkpoints.interval.seconds
消费者偏移同步的频率。默认值为 60 (1 分钟)。
 

 
group.filter.class
选择要复制的消费者组的组过滤器。默认为 org.apache.kafka.connect.mirror.DefaultGroupFilter
 

 
refresh.groups.enabled
启用检查新消费者组。默认为 true
 

 
refresh.groups.interval.seconds
消费者组刷新的频率。默认为 600 (10 分钟)。
 

 
sync.group.offsets.enabled
启用将消费者组偏移同步到目标集群 __consumer_offsets 主题。默认为 false
 

 
sync.group.offsets.interval.seconds
消费者组偏移同步的频率。默认值为 60 (1 分钟)。
 

 
emit.heartbeats.enabled
在目标集群中启用连接检查。默认为 true
  

emit.heartbeats.interval.seconds
连接检查的频率。默认为 1 ( 1 秒)。
  

heartbeats.topic.replication.factor
内部 心跳 主题的复制因素。默认为 3
  

2.4.3. 连接器制作者和消费者配置

MirrorMaker 2 连接器使用内部制作者和消费者。如果需要,您可以配置这些制作者和消费者来覆盖默认设置。

例如,您可以增加 source producer 的 batch.size,将主题发送到目标 Kafka 集群,以更好地容纳大量消息。

重要

生产者和消费者配置选项取决于 MirrorMaker 2 的实施,可能随时更改。

下表描述了每个连接器的生产者和消费者,以及您可以添加配置。

Expand
表 2.2. 源连接器制作者和消费者
类型Description配置

制作者

将主题信息发送到目标 Kafka 集群。考虑在处理大量数据时调整此制作者的配置。

mirrors.sourceConnector.config: producer.override.*

制作者

写入 offset-syncs 主题,该主题映射复制主题分区的源和目标偏移。

mirrors.sourceConnector.config: producer.*

消费者

从源 Kafka 集群检索主题信息。

mirrors.sourceConnector.config: consumer.*

Expand
表 2.3. Checkpoint 连接器生成者和消费者
类型Description配置

制作者

发送消费者偏移检查点。

mirrors.checkpointConnector.config: producer.override.*

消费者

加载 offset-syncs 主题。

mirrors.checkpointConnector.config: consumer.*

注意

您可以将 offset-syncs.topic.location 设置为 target 来使用目标 Kafka 集群作为 offset-syncs 主题的位置。

Expand
表 2.4. heartbeat 连接器制作者
类型Description配置

制作者

发送心跳。

mirrors.heartbeatConnector.config: producer.override.*

下例演示了如何配置制作者和消费者。

连接器制作者和消费者配置示例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
  name: my-mirror-maker2
spec:
  version: 3.4.0
  # ...
  mirrors:
  - sourceCluster: "my-cluster-source"
    targetCluster: "my-cluster-target"
    sourceConnector:
      tasksMax: 5
      config:
        producer.override.batch.size: 327680
        producer.override.linger.ms: 100
        producer.request.timeout.ms: 30000
        consumer.fetch.max.bytes: 52428800
        # ...
    checkpointConnector:
      config:
        producer.override.request.timeout.ms: 30000
        consumer.max.poll.interval.ms: 300000
        # ...
    heartbeatConnector:
      config:
        producer.override.request.timeout.ms: 30000
        # ...

2.4.4. 指定最多的任务数

连接器创建负责在 Kafka 中移动数据的任务。每个连接器由一个或多个在运行任务的 worker pod 进行分组的任务组成。在复制大量分区或同步大量消费者组的偏移时,增加任务数量可以帮助解决性能问题。

任务并行运行。为 worker 分配一个或多个任务。单个任务由一个 worker pod 处理,因此您不需要多个 worker pod 而不是任务。如果有多个任务,worker 会处理多个任务。

您可以使用 tasksMax 属性在 MirrorMaker 配置中指定最大连接器任务数量。在不指定最多任务数量的情况下,默认设置是一个任务。

heartbeat 连接器始终使用单个任务。

为源和检查点连接器启动的任务数量是 tasksMax 的最大任务数和 tasksMax 的值之间的较低值。对于源连接器,可能的最大任务数量是从源集群复制的每个分区。对于检查点连接器,可能的最大任务数都是从源集群复制的每个消费者组。在设置最多任务数量时,请考虑分区数量和支持进程的硬件资源。

如果基础架构支持处理开销,增加任务数量可以提高吞吐量和延迟。例如,当有大量分区或消费者组时,添加更多任务可减少轮询源集群的时间。

当您有大量分区时,增加检查点连接器的任务数量非常有用。

为源连接器增加任务数量

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
  name: my-mirror-maker2
spec:
  # ...
  mirrors:
  - sourceCluster: "my-cluster-source"
    targetCluster: "my-cluster-target"
    sourceConnector:
      tasksMax: 10
  # ...

当您有大量消费者组时,增加检查点连接器的任务数量会很有用。

为检查点连接器增加任务数量

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
  name: my-mirror-maker2
spec:
  # ...
  mirrors:
  - sourceCluster: "my-cluster-source"
    targetCluster: "my-cluster-target"
    checkpointConnector:
      tasksMax: 10
  # ...

默认情况下,M MirrorMaker 2 每 10 分钟检查新的消费者组。您可以调整 refresh.groups.interval.seconds 配置以更改频率。调整降低时要小心。更频繁地检查会对性能造成负面影响。

2.4.4.1. 检查连接器任务操作

如果使用 Prometheus 和 Grafana 监控部署,您可以检查 MirrorMaker 2 性能。AMQ Streams 提供的 MirrorMaker 2 Grafana 仪表板示例显示了与任务和延迟相关的以下指标。

  • 任务数量
  • 复制延迟
  • 偏移同步延迟

2.4.5. ACL 规则同步

如果您不使用 User Operator,则可以对远程主题进行 ACL 访问。

如果使用 AclAuthorizer,但没有 User Operator,则管理对代理的访问的 ACL 规则也适用于远程主题。读取源主题的用户可以读取其远程的等效内容。

注意

OAuth 2.0 授权不支持以这种方式访问远程主题。

2.4.6. 配置 Kafka MirrorMaker 2

使用 KafkaMirrorMaker2 资源的属性来配置 Kafka MirrorMaker 2 部署。使用 MirrorMaker 2 在 Kafka 集群间同步数据。

配置必须指定:

  • 每个 Kafka 集群
  • 每个集群的连接信息,包括身份验证
  • 复制流和方向

    • 集群到集群
    • topic to topic
注意

以前版本的 MirrorMaker 仍被支持。如果要使用为之前的版本配置的资源,则必须将其更新为 MirrorMaker 2 支持的格式。

MirrorMaker 2 为复制因素等属性提供默认配置值。最小配置(默认值)保持不变,如下例所示:

MirrorMaker 2 的最小配置

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
  name: my-mirror-maker2
spec:
  version: 3.4.0
  connectCluster: "my-cluster-target"
  clusters:
  - alias: "my-cluster-source"
    bootstrapServers: my-cluster-source-kafka-bootstrap:9092
  - alias: "my-cluster-target"
    bootstrapServers: my-cluster-target-kafka-bootstrap:9092
  mirrors:
  - sourceCluster: "my-cluster-source"
    targetCluster: "my-cluster-target"
    sourceConnector: {}

您可以使用 mTLS 或 SASL 身份验证为源和目标集群配置访问控制。此流程显示对源和目标集群使用 TLS 加密和 mTLS 身份验证的配置。

您可以在 KafkaMirrorMaker2 资源中指定您要从源集群复制的主题和消费者组。您可以使用 topicsPatterngroupsPattern 属性来执行此操作。您可以提供名称或使用正则表达式列表。默认情况下,如果您没有设置 topicsPatterngroupsPattern 属性,则会复制所有主题和消费者组。您还可以使用 ".*" 作为正则表达式来复制所有主题和消费者组。但是,尝试只指定您需要指定主题和消费者组,以避免在集群中造成不必要的额外负载。

处理大量信息

您可以调整配置来处理大量信息。如需更多信息,请参阅处理大量信息

先决条件

  • AMQ Streams 正在运行
  • 源和目标 Kafka 集群可用

流程

  1. 编辑 KafkaMirrorMaker2 资源的 spec 属性。

    您可以配置的属性显示在以下示例配置中:

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaMirrorMaker2
    metadata:
      name: my-mirror-maker2
    spec:
      version: 3.4.0 
    1
    
      replicas: 3 
    2
    
      connectCluster: "my-cluster-target" 
    3
    
      clusters: 
    4
    
      - alias: "my-cluster-source" 
    5
    
        authentication: 
    6
    
          certificateAndKey:
            certificate: source.crt
            key: source.key
            secretName: my-user-source
          type: tls
        bootstrapServers: my-cluster-source-kafka-bootstrap:9092 
    7
    
        tls: 
    8
    
          trustedCertificates:
          - certificate: ca.crt
            secretName: my-cluster-source-cluster-ca-cert
      - alias: "my-cluster-target" 
    9
    
        authentication: 
    10
    
          certificateAndKey:
            certificate: target.crt
            key: target.key
            secretName: my-user-target
          type: tls
        bootstrapServers: my-cluster-target-kafka-bootstrap:9092 
    11
    
        config: 
    12
    
          config.storage.replication.factor: 1
          offset.storage.replication.factor: 1
          status.storage.replication.factor: 1
          ssl.cipher.suites: TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384 
    13
    
          ssl.enabled.protocols: TLSv1.2
          ssl.protocol: TLSv1.2
          ssl.endpoint.identification.algorithm: HTTPS 
    14
    
        tls: 
    15
    
          trustedCertificates:
          - certificate: ca.crt
            secretName: my-cluster-target-cluster-ca-cert
      mirrors: 
    16
    
      - sourceCluster: "my-cluster-source" 
    17
    
        targetCluster: "my-cluster-target" 
    18
    
        sourceConnector: 
    19
    
          tasksMax: 10 
    20
    
          autoRestart: 
    21
    
            enabled: true
          config:
            replication.factor: 1 
    22
    
            offset-syncs.topic.replication.factor: 1 
    23
    
            sync.topic.acls.enabled: "false" 
    24
    
            refresh.topics.interval.seconds: 60 
    25
    
            replication.policy.separator: "." 
    26
    
            replication.policy.class: "org.apache.kafka.connect.mirror.IdentityReplicationPolicy" 
    27
    
        heartbeatConnector: 
    28
    
          autoRestart:
            enabled: true
          config:
            heartbeats.topic.replication.factor: 1 
    29
    
        checkpointConnector: 
    30
    
          autoRestart:
            enabled: true
          config:
            checkpoints.topic.replication.factor: 1 
    31
    
            refresh.groups.interval.seconds: 600 
    32
    
            sync.group.offsets.enabled: true 
    33
    
            sync.group.offsets.interval.seconds: 60 
    34
    
            emit.checkpoints.interval.seconds: 60 
    35
    
            replication.policy.class: "org.apache.kafka.connect.mirror.IdentityReplicationPolicy"
        topicsPattern: "topic1|topic2|topic3" 
    36
    
        groupsPattern: "group1|group2|group3" 
    37
    
      resources: 
    38
    
        requests:
          cpu: "1"
          memory: 2Gi
        limits:
          cpu: "2"
          memory: 2Gi
      logging: 
    39
    
        type: inline
        loggers:
          connect.root.logger.level: "INFO"
      readinessProbe: 
    40
    
        initialDelaySeconds: 15
        timeoutSeconds: 5
      livenessProbe:
        initialDelaySeconds: 15
        timeoutSeconds: 5
      jvmOptions: 
    41
    
        "-Xmx": "1g"
        "-Xms": "1g"
      image: my-org/my-image:latest 
    42
    
      rack:
        topologyKey: topology.kubernetes.io/zone 
    43
    
      template: 
    44
    
        pod:
          affinity:
            podAntiAffinity:
              requiredDuringSchedulingIgnoredDuringExecution:
                - labelSelector:
                    matchExpressions:
                      - key: application
                        operator: In
                        values:
                          - postgresql
                          - mongodb
                  topologyKey: "kubernetes.io/hostname"
        connectContainer: 
    45
    
          env:
            - name: JAEGER_SERVICE_NAME
              value: my-jaeger-service
            - name: JAEGER_AGENT_HOST
              value: jaeger-agent-name
            - name: JAEGER_AGENT_PORT
              value: "6831"
      tracing:
        type: jaeger 
    46
    
      externalConfiguration: 
    47
    
        env:
          - name: AWS_ACCESS_KEY_ID
            valueFrom:
              secretKeyRef:
                name: aws-creds
                key: awsAccessKey
          - name: AWS_SECRET_ACCESS_KEY
            valueFrom:
              secretKeyRef:
                name: aws-creds
                key: awsSecretAccessKey
    1
    Kafka Connect 和 Mirror Maker 2.0 版本,它们始终相同。
    2
    用于运行服务的 worker 的副本节点数量
    3
    Kafka Connect 的 Kafka 集群别名,它必须 指定目标 Kafka 集群。Kafka Connect 用于其内部主题。
    4
    正在同步的 Kafka 集群的规格
    5
    源 Kafka 集群的集群别名
    6
    源集群的身份验证,指定为 mTLS基于令牌的 OAuth、基于 SASL 的 SCRAM-SHA-256/SCRAM-SHA-512PLAIN
    7
    用于连接到源 Kafka 集群的 Bootstrap 服务器
    8
    TLS 加密,使用密钥名称,其中 TLS 证书存储为源 Kafka 集群的 X.509 格式。如果证书存储在同一 secret 中,它可以多次列出。
    9
    目标 Kafka 集群的集群别名
    10
    目标 Kafka 集群的验证配置方式与源 Kafka 集群相同。
    11
    用于连接目标 Kafka 集群的 Bootstrap 服务器
    12
    Kafka 连接配置。可以提供标准 Apache Kafka 配置,仅限于不受 AMQ Streams 直接管理的属性。
    13
    外部监听程序的 SSL 属性 使用 TLS 版本的特定 密码套件 运行。
    14
    通过设置为 HTTPS 来启用主机名验证。空字符串禁用验证。
    15
    目标 Kafka 集群的 TLS 加密与源 Kafka 集群相同。
    16
    17
    MirrorMaker 2 连接器使用的源集群的集群 别名
    18
    MirrorMaker 2 连接器使用的目标 集群的集群别名
    19
    创建远程主题的 MirrorSourceConnector 的配置。配置会覆盖 默认配置选项。
    20
    连接器可以创建的最大任务数量。任务处理数据复制并并行运行。如果基础架构支持处理开销,增加这个值可以提高吞吐量。Kafka Connect 在集群成员之间分发任务。如果任务数量超过 worker,则 worker 会被分配多个任务。对于接收器连接器,旨在为每个消耗的主题分区有一个任务。对于源连接器,并行运行的任务数量也可能依赖于外部系统。如果无法达到并行性,则连接器创建少于最大任务数量。
    21
    启用自动重启失败的连接器和任务。最多进行七次重启尝试,之后必须手动重新启动。
    22
    在目标集群中创建的镜像主题的复制因素。
    23
    MirrorSourceConnector offset-syncs 内部主题的复制因素,用于映射源和目标集群的偏移。
    24
    启用 ACL 规则同步后,会应用 ACL 来同步主题。默认值是 true。此功能与 User Operator 不兼容。如果使用 User Operator,请将此属性设置为 false
    25
    可选设置,以更改检查新主题的频率。默认值为每 10 分钟进行一次检查。
    26
    定义用于重命名远程主题的分隔符。
    27
    添加可覆盖远程主题自动重命名的策略。该主题不会用源集群的名称来附加名称,而是保留其原始名称。此可选设置可用于主动/被动备份和数据迁移。要配置主题偏移同步,还必须为 checkpointConnector.config 设置此属性。
    28
    执行连接 检查的 MirrorHeartbeatConnector 的配置。配置会覆盖 默认配置选项。
    29
    在目标集群中创建的 heartbeat 主题的复制因素。
    30
    跟踪偏移的 MirrorCheckpointConnector 的配置。配置会覆盖 默认配置选项。
    31
    在目标集群中创建的检查点主题的复制因素。
    32
    可选设置,用于更改新消费者组的检查频率。默认值为每 10 分钟进行一次检查。
    33
    可选设置来同步消费者组偏移,这对于在主动/被动配置中恢复非常有用。默认情况下不启用同步。
    34
    如果启用了使用者组偏移的同步,您可以调整同步的频率。
    35
    调整检查偏移跟踪的频率。如果您更改了偏移同步的频率,您可能需要调整这些检查的频率。
    36
    使用 以逗号分隔列表或正则表达式模式 定义的 源集群的主题复制。源连接器复制指定的主题。checkpoint 连接器跟踪指定主题的偏移。在这里,我们按名称请求三个主题。
    37
    定义为逗号分隔列表或正则表达式模式 的 源集群的消费者组复制。checkpoint 连接器复制指定的消费者组。在这里,我们按名称请求三个消费者组。
    38
    支持的资源、当前 cpu 和内存的 保留请求,以及指定可消耗的最大资源的限制。
    39
    指定 Kafka 连接日志记录器和日志级别 直接(内联)或通过 ConfigMap 间接(外部)添加。自定义 ConfigMap 必须放在 log4j.propertieslog4j2.properties 键下。对于 Kafka Connect log4j.rootLogger 日志记录器,您可以将日志级别设置为 INFO, ERROR, WARN, TRACE, DEBUG, FATAL 或 OFF。
    40
    使用 HealthCheck 可以知道何时重启一个容器(存活度)以及何时一个容器可以开始接受流量(就绪度)。
    41
    JVM 配置选项 优化运行 Kafka MirrorMaker 的虚拟机(VM)的性能。
    42
    ADVANCED OPTION: 容器镜像配置,这只在特殊情况下建议使用。
    43
    SPECIALIZED OPTION: Rack awareness 配置用于部署。这是用于在同一位置(而不是跨地区)部署的专用选项。如果您希望连接器从最接近的副本而不是领导副本使用,则使用这个选项。在某些情况下,从最接近的副本消耗可以提高网络利用率或降低成本。topologyKey 必须与包含机架 ID 的节点标签匹配。此配置中使用的示例使用标准 topology.kubernetes.io/zone 标签指定区。要从最接近的副本使用,请在 Kafka 代理配置中启用 RackAwareReplicaSelector
    44
    模板自定义.这里的 pod 使用反关联性调度,因此 pod 不会调度到具有相同主机名的节点。
    45
    为分布式追踪设置环境变量。
    46
    为 Jaeger 启用分布式追踪。
    47
    挂载到 Kafka MirrorMaker 的 OpenShift Secret 的外部配置 作为环境变量。您还可以使用 配置供应商插件 来加载来自外部来源的配置值
  2. 创建或更新资源:

    oc apply -f MIRRORMAKER-CONFIGURATION-FILE

2.4.7. 保护 Kafka MirrorMaker 2 部署

此流程概述了保护 MirrorMaker 2 部署所需的配置。

您需要为源 Kafka 集群和目标 Kafka 集群进行单独的配置。您还需要单独的用户配置来提供 MirrorMaker 连接到源和目标 Kafka 集群所需的凭证。

对于 Kafka 集群,您可以为 OpenShift 集群内的安全连接指定内部监听程序,为 OpenShift 集群外部的连接指定外部监听程序。

您可以配置身份验证和授权机制。为源和目标 Kafka 集群实施的安全选项必须与 MirrorMaker 2 实施的安全选项兼容。

创建集群和用户身份验证凭据后,您可以在 MirrorMaker 配置中指定它们进行安全连接。

注意

在此过程中,使用 Cluster Operator 生成的证书,但 您可以通过安装自己的证书 来替换它们。您还可以将监听程序 配置为使用由外部 CA (证书颁发机构)管理的 Kafka 侦听器证书

开始前

在开始此步骤前,请查看 AMQ Streams 提供 的示例配置文件。它们包括使用 mTLS 或 SCRAM-SHA-512 身份验证来保护 MirrorMaker 2 部署的示例。示例指定用于在 OpenShift 集群内连接的内部监听程序。

示例提供了完整的授权配置,包括 MirrorMaker 2 所需的所有 ACL,以允许源和目标 Kafka 集群上的操作。

先决条件

  • AMQ Streams 正在运行
  • 源和目标集群的独立命名空间

此流程假设源和目标 Kafka 集群被安装到单独的命名空间(如果需要使用 Topic Operator),则需要这样做。Topic Operator 只监视指定命名空间中的单个集群。

通过将集群分隔到命名空间中,您需要复制集群 secret,以便可以在命名空间外访问它们。您需要在 MirrorMaker 配置中引用 secret。

流程

  1. 配置两个 Kafka 资源,一个用于保护源 Kafka 集群,另一个用于保护目标 Kafka 集群。

    您可以为身份验证和授权添加监听程序配置。

    在本例中,为带有 TLS 加密和 mTLS 验证的 Kafka 集群配置内部监听程序。Kafka 简单 授权已启用。

    使用 TLS 加密和 mTLS 验证的源 Kafka 集群配置示例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-source-cluster
    spec:
      kafka:
        version: 3.4.0
        replicas: 1
        listeners:
          - name: tls
            port: 9093
            type: internal
            tls: true
            authentication:
              type: tls
        authorization:
          type: simple
        config:
          offsets.topic.replication.factor: 1
          transaction.state.log.replication.factor: 1
          transaction.state.log.min.isr: 1
          default.replication.factor: 1
          min.insync.replicas: 1
          inter.broker.protocol.version: "3.4"
        storage:
          type: jbod
          volumes:
          - id: 0
            type: persistent-claim
            size: 100Gi
            deleteClaim: false
      zookeeper:
        replicas: 1
        storage:
          type: persistent-claim
          size: 100Gi
          deleteClaim: false
      entityOperator:
        topicOperator: {}
        userOperator: {}

    使用 TLS 加密和 mTLS 身份验证的目标 Kafka 集群配置示例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-target-cluster
    spec:
      kafka:
        version: 3.4.0
        replicas: 1
        listeners:
          - name: tls
            port: 9093
            type: internal
            tls: true
            authentication:
              type: tls
        authorization:
          type: simple
        config:
          offsets.topic.replication.factor: 1
          transaction.state.log.replication.factor: 1
          transaction.state.log.min.isr: 1
          default.replication.factor: 1
          min.insync.replicas: 1
          inter.broker.protocol.version: "3.4"
        storage:
          type: jbod
          volumes:
            - id: 0
              type: persistent-claim
              size: 100Gi
              deleteClaim: false
      zookeeper:
        replicas: 1
        storage:
          type: persistent-claim
          size: 100Gi
          deleteClaim: false
      entityOperator:
        topicOperator: {}
        userOperator: {}

  2. 在单独的命名空间中创建或更新 Kafka 资源。

    oc apply -f <kafka_configuration_file> -n <namespace>

    Cluster Operator 创建监听程序并设置集群和客户端证书颁发机构(CA)证书,以便在 Kafka 集群中启用身份验证。

    证书在 secret < cluster_name> -cluster-ca-cert 中创建。

  3. 配置两个 KafkaUser 资源,一个用于源 Kafka 集群的用户,另一个用于目标 Kafka 集群的用户。

    1. 将相同的身份验证和授权类型配置为对应的源和目标 Kafka 集群。例如,如果您在源 Kafka 集群的 Kafka 配置中使用了 tls 验证和 simple 授权类型,请在 KafkaUser 配置中使用相同的。
    2. 配置 MirrorMaker 2 所需的 ACL,以允许源和目标 Kafka 集群上的操作。

      ACL 由内部 MirrorMaker 连接器和底层 Kafka 连接框架使用。

    mTLS 验证的源用户配置示例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaUser
    metadata:
      name: my-source-user
      labels:
        strimzi.io/cluster: my-source-cluster
    spec:
      authentication:
        type: tls
      authorization:
        type: simple
        acls:
          # MirrorSourceConnector
          - resource: # Not needed if offset-syncs.topic.location=target
              type: topic
              name: mm2-offset-syncs.my-target-cluster.internal
            operations:
              - Create
              - DescribeConfigs
              - Read
              - Write
          - resource: # Needed for every topic which is mirrored
              type: topic
              name: "*"
            operations:
              - DescribeConfigs
              - Read
          # MirrorCheckpointConnector
          - resource:
              type: cluster
            operations:
              - Describe
          - resource: # Needed for every group for which offsets are synced
              type: group
              name: "*"
            operations:
              - Describe
          - resource: # Not needed if offset-syncs.topic.location=target
              type: topic
              name: mm2-offset-syncs.my-target-cluster.internal
            operations:
              - Read

    mTLS 身份验证的目标用户配置示例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaUser
    metadata:
      name: my-target-user
      labels:
        strimzi.io/cluster: my-target-cluster
    spec:
      authentication:
        type: tls
      authorization:
        type: simple
        acls:
          # Underlying Kafka Connect internal topics to store configuration, offsets, or status
          - resource:
              type: group
              name: mirrormaker2-cluster
            operations:
              - Read
          - resource:
              type: topic
              name: mirrormaker2-cluster-configs
            operations:
              - Create
              - Describe
              - DescribeConfigs
              - Read
              - Write
          - resource:
              type: topic
              name: mirrormaker2-cluster-status
            operations:
              - Create
              - Describe
              - DescribeConfigs
              - Read
              - Write
          - resource:
              type: topic
              name: mirrormaker2-cluster-offsets
            operations:
              - Create
              - Describe
              - DescribeConfigs
              - Read
              - Write
          # MirrorSourceConnector
          - resource: # Needed for every topic which is mirrored
              type: topic
              name: "*"
            operations:
              - Create
              - Alter
              - AlterConfigs
              - Write
          # MirrorCheckpointConnector
          - resource:
              type: cluster
            operations:
              - Describe
          - resource:
              type: topic
              name: my-source-cluster.checkpoints.internal
            operations:
              - Create
              - Describe
              - Read
              - Write
          - resource: # Needed for every group for which the offset is synced
              type: group
              name: "*"
            operations:
              - Read
              - Describe
          # MirrorHeartbeatConnector
          - resource:
              type: topic
              name: heartbeats
            operations:
              - Create
              - Describe
              - Write

    注意

    您可以通过将 type 设置为 tls-external 来使用在 User Operator 外部发布的证书。如需更多信息,请参阅 第 6.2.93 节 “KafkaUserSpec 模式参考”

  4. 在您为源和目标集群创建的每个命名空间中创建或更新 KafkaUser 资源。

    oc apply -f <kafka_user_configuration_file> -n <namespace>

    根据所选的验证类型,User Operator 会创建代表客户端(MirrorMaker)的用户,以及用于客户端身份验证的安全凭证。

    User Operator 会创建一个与 KafkaUser 资源相同的新 secret。secret 包含用于 mTLS 验证的私钥和公钥。公钥包含在用户证书中,证书由客户端 CA 签名。

  5. 使用身份验证详情配置 KafkaMirrorMaker2 资源,以连接到源和目标集群。

    带有 TLS 加密和 mTLS 身份验证的 MirrorMaker 2 配置示例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaMirrorMaker2
    metadata:
      name: my-mirror-maker-2
    spec:
      version: 3.4.0
      replicas: 1
      connectCluster: "my-target-cluster"
      clusters:
        - alias: "my-source-cluster"
          bootstrapServers: my-source-cluster-kafka-bootstrap:9093
          tls: 
    1
    
            trustedCertificates:
              - secretName: my-source-cluster-cluster-ca-cert
                certificate: ca.crt
          authentication: 
    2
    
            type: tls
            certificateAndKey:
              secretName: my-source-user
              certificate: user.crt
              key: user.key
        - alias: "my-target-cluster"
          bootstrapServers: my-target-cluster-kafka-bootstrap:9093
          tls: 
    3
    
            trustedCertificates:
              - secretName: my-target-cluster-cluster-ca-cert
                certificate: ca.crt
          authentication: 
    4
    
            type: tls
            certificateAndKey:
              secretName: my-target-user
              certificate: user.crt
              key: user.key
          config:
            # -1 means it will use the default replication factor configured in the broker
            config.storage.replication.factor: -1
            offset.storage.replication.factor: -1
            status.storage.replication.factor: -1
      mirrors:
        - sourceCluster: "my-source-cluster"
          targetCluster: "my-target-cluster"
          sourceConnector:
            config:
              replication.factor: 1
              offset-syncs.topic.replication.factor: 1
              sync.topic.acls.enabled: "false"
          heartbeatConnector:
            config:
              heartbeats.topic.replication.factor: 1
          checkpointConnector:
            config:
              checkpoints.topic.replication.factor: 1
              sync.group.offsets.enabled: "true"
          topicsPattern: "topic1|topic2|topic3"
          groupsPattern: "group1|group2|group3"

    1
    源 Kafka 集群的 TLS 证书。如果它们位于单独的命名空间中,请从 Kafka 集群的命名空间中复制集群 secret。
    2
    使用 TLS 机制 访问源 Kafka 集群的用户身份验证。
    3
    目标 Kafka 集群的 TLS 证书。
    4
    访问目标 Kafka 集群的用户身份验证。
  6. 在与目标 Kafka 集群相同的命名空间中创建或更新 KafkaMirrorMaker2 资源。

    oc apply -f <mirrormaker2_configuration_file> -n <namespace_of_target_cluster>

2.4.8. 执行 Kafka MirrorMaker 2 连接器重启

此流程描述了如何使用 OpenShift 注解手动触发 Kafka MirrorMaker 2 连接器的重启。

先决条件

  • Cluster Operator 正在运行。

流程

  1. 查找控制您要重启的 Kafka MirrorMaker 2 连接器的 KafkaMirrorMaker2 自定义资源的名称:

    oc get KafkaMirrorMaker2
  2. 查找要从 KafkaMirrorMaker2 自定义资源重启的 Kafka MirrorMaker 2 连接器的名称。

    oc describe KafkaMirrorMaker2 KAFKAMIRRORMAKER-2-NAME
  3. 要重启连接器,请在 OpenShift 中注解 KafkaMirrorMaker2 资源。在本例中,oc annotate 重启一个名为 my-source->my-target.MirrorSourceConnector 的连接器:

    oc annotate KafkaMirrorMaker2 KAFKAMIRRORMAKER-2-NAME "strimzi.io/restart-connector=my-source->my-target.MirrorSourceConnector"
  4. 等待下一个协调发生(默认为两分钟)。

    只要协调过程检测到注解,Kafka MirrorMaker 2 连接器会被重启。当重启请求被接受时,该注解会从 KafkaMirrorMaker2 自定义资源中删除。

2.4.9. 执行 Kafka MirrorMaker 2 连接器任务的重启

此流程描述了如何使用 OpenShift 注解手动触发 Kafka MirrorMaker 2 连接器任务的重启。

先决条件

  • Cluster Operator 正在运行。

流程

  1. 查找控制您要重启的 Kafka MirrorMaker 2 连接器的 KafkaMirrorMaker2 自定义资源的名称:

    oc get KafkaMirrorMaker2
  2. 查找 Kafka MirrorMaker 2 连接器的名称以及从 KafkaMirrorMaker2 自定义资源重启的任务 ID。任务 ID 是非负的整数,从 0 开始。

    oc describe KafkaMirrorMaker2 KAFKAMIRRORMAKER-2-NAME
  3. 要重启连接器任务,请在 OpenShift 中注解 KafkaMirrorMaker2 资源。在本例中,oc annotate restart task 0 of a connector named my-source->my-target.MirrorSourceConnector:

    oc annotate KafkaMirrorMaker2 KAFKAMIRRORMAKER-2-NAME "strimzi.io/restart-connector-task=my-source->my-target.MirrorSourceConnector:0"
  4. 等待下一个协调发生(默认为两分钟)。

    只要协调过程检测到注解,Kafka MirrorMaker 2 连接器任务会被重启。当 restart 任务请求时,注解会从 KafkaMirrorMaker2 自定义资源中删除。

Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2026 Red Hat
返回顶部