9.9. 配置 Kafka MirrorMaker 2


更新 KafkaMirrorMaker2 自定义资源的 spec 属性,以配置 MirrorMaker 2 部署。MirrorMaker 2 使用源集群配置进行数据消耗和目标集群配置进行数据输出。

MirrorMaker 2 基于 Kafka Connect 框架,连接器 管理集群之间的数据传输。

您可以配置 MirrorMaker 2 以定义 Kafka Connect 部署,包括源和目标集群的连接详情,然后运行 MirrorMaker 2 连接器的集合来进行连接。

MirrorMaker 2 支持源和目标集群之间的主题配置同步。您可以在 MirrorMaker 2 配置中指定源主题。MirrorMaker 2 监控源主题。MirrorMaker 2 会检测并将更改传播到源主题到远程主题。更改可能包括自动创建缺少的主题和分区。

注意

在大多数情况下,您写入本地主题并从远程主题读取。虽然对远程主题不阻止写操作,但应该避免使用它们。

配置必须指定:

  • 每个 Kafka 集群
  • 每个集群的连接信息,包括身份验证
  • 复制流和方向

    • 集群到集群
    • topic 的主题

要深入了解 Kafka MirrorMaker 2 集群配置选项,请参阅 自定义资源 API 参考

注意

MirrorMaker 2 资源配置与之前的 MirrorMaker 版本不同,它现已弃用。当前不支持旧支持,因此任何资源都必须手动转换为新格式。

默认配置

MirrorMaker 2 为复制因素等属性提供默认配置值。最小配置保持不变,默认值如下:

MirrorMaker 2 的最小配置

Copy to Clipboard Toggle word wrap
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
  name: my-mirror-maker2
spec:
  version: 3.8.0
  connectCluster: "my-cluster-target"
  clusters:
  - alias: "my-cluster-source"
    bootstrapServers: my-cluster-source-kafka-bootstrap:9092
  - alias: "my-cluster-target"
    bootstrapServers: my-cluster-target-kafka-bootstrap:9092
  mirrors:
  - sourceCluster: "my-cluster-source"
    targetCluster: "my-cluster-target"
    sourceConnector: {}

您可以使用 mTLS 或 SASL 身份验证为源和目标集群配置访问控制。此流程演示了如何为源和目标集群使用 TLS 加密和 mTLS 身份验证的配置。

您可以在 KafkaMirrorMaker2 资源中指定您要从源集群复制的主题和消费者组。您可以使用 topicsPatterngroupsPattern 属性进行此操作。您可以提供名称列表或使用正则表达式。默认情况下,如果您未设置 topicsPatterngroupsPattern 属性,则会复制所有主题和消费者组。您可以使用 ".*" 正则表达式来复制所有主题和消费者组。但是,尝试只指定您需要指定主题和消费者组,以避免在集群中造成不必要的额外负载。

处理大量信息

您可以调整配置以处理大量信息。如需更多信息,请参阅 处理大量信息

KafkaMirrorMaker2 自定义资源配置示例

Copy to Clipboard Toggle word wrap
# Basic configuration (required)
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
  name: my-mirror-maker2
# Deployment specifications
spec:
  # Replicas (required)
  replicas: 3 
1

  # Connect cluster name (required)
  connectCluster: "my-cluster-target" 
2

  # Cluster configurations (required)
  clusters: 
3

    - alias: "my-cluster-source" 
4

      # Authentication (optional)
      authentication: 
5

        certificateAndKey:
          certificate: source.crt
          key: source.key
          secretName: my-user-source
        type: tls
      bootstrapServers: my-cluster-source-kafka-bootstrap:9092 
6

      # TLS configuration (optional)
      tls: 
7

        trustedCertificates:
          - pattern: "*.crt"
            secretName: my-cluster-source-cluster-ca-cert
    - alias: "my-cluster-target" 
8

      # Authentication (optional)
      authentication: 
9

        certificateAndKey:
          certificate: target.crt
          key: target.key
          secretName: my-user-target
        type: tls
      bootstrapServers: my-cluster-target-kafka-bootstrap:9092 
10

      # Kafka Connect configuration (optional)
      config: 
11

        config.storage.replication.factor: 1
        offset.storage.replication.factor: 1
        status.storage.replication.factor: 1
      # TLS configuration (optional)
      tls: 
12

        trustedCertificates:
          - pattern: "*.crt"
            secretName: my-cluster-target-cluster-ca-cert
  # Mirroring configurations (required)
  mirrors: 
13

    - sourceCluster: "my-cluster-source" 
14

      targetCluster: "my-cluster-target" 
15

      # Topic and group patterns (required)
      topicsPattern: "topic1|topic2|topic3" 
16

      groupsPattern: "group1|group2|group3" 
17

      # Source connector configuration (required)
      sourceConnector: 
18

        tasksMax: 10 
19

        autoRestart: 
20

          enabled: true
        config:
          replication.factor: 1 
21

          offset-syncs.topic.replication.factor: 1 
22

          sync.topic.acls.enabled: "false" 
23

          refresh.topics.interval.seconds: 60 
24

          replication.policy.class: "org.apache.kafka.connect.mirror.IdentityReplicationPolicy" 
25

      # Heartbeat connector configuration (optional)
      heartbeatConnector: 
26

        autoRestart:
          enabled: true
        config:
          heartbeats.topic.replication.factor: 1 
27

          replication.policy.class: "org.apache.kafka.connect.mirror.IdentityReplicationPolicy"
      # Checkpoint connector configuration (optional)
      checkpointConnector: 
28

        autoRestart:
          enabled: true
        config:
          checkpoints.topic.replication.factor: 1 
29

          refresh.groups.interval.seconds: 600 
30

          sync.group.offsets.enabled: true 
31

          sync.group.offsets.interval.seconds: 60 
32

          emit.checkpoints.interval.seconds: 60 
33

          replication.policy.class: "org.apache.kafka.connect.mirror.IdentityReplicationPolicy"
  # Kafka version (recommended)
  version: 3.8.0 
34

  # Resources requests and limits (recommended)
  resources: 
35

    requests:
      cpu: "1"
      memory: 2Gi
    limits:
      cpu: "2"
      memory: 2Gi
  # Logging configuration (optional)
  logging: 
36

    type: inline
    loggers:
      connect.root.logger.level: INFO
  # Readiness probe (optional)
  readinessProbe: 
37

    initialDelaySeconds: 15
    timeoutSeconds: 5
  # Liveness probe (optional)
  livenessProbe:
    initialDelaySeconds: 15
    timeoutSeconds: 5
  # JVM options (optional)
  jvmOptions: 
38

    "-Xmx": "1g"
    "-Xms": "1g"
  # Custom image (optional)
  image: my-org/my-image:latest 
39

  # Rack awareness (optional)
  rack:
    topologyKey: topology.kubernetes.io/zone 
40

  # Pod template (optional)
  template: 
41

    pod:
      affinity:
        podAntiAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            - labelSelector:
                matchExpressions:
                  - key: application
                    operator: In
                    values:
                      - postgresql
                      - mongodb
              topologyKey: "kubernetes.io/hostname"
    connectContainer: 
42

      env:
        - name: OTEL_SERVICE_NAME
          value: my-otel-service
        - name: OTEL_EXPORTER_OTLP_ENDPOINT
          value: "http://otlp-host:4317"
  # Tracing configuration (optional)
  tracing:
    type: opentelemetry 
43

1
运行任务的 worker 的副本节点数量。
2
Kafka Connect 的 Kafka 集群别名,它必须 指定目标 Kafka 集群。Kafka Connect 使用 Kafka 集群用于其内部主题。
3
正在同步的 Kafka 集群的规格。
4
源 Kafka 集群的集群别名。
5
源集群的身份验证,指定为 mTLS、基于令牌的 OAuth、基于 SASL 的 SCRAM-SHA-256/SCRAM-SHA-512 或 PLAIN。
6
连接到源 Kafka 集群的 bootstrap 地址。该地址采用 < cluster_name>-kafka-bootstrap:<port_number> 格式。Kafka 集群不需要由 Streams for Apache Kafka 管理,或部署到 Kubernetes 集群。
7
用于加密到 Kafka 集群的 TLS 配置,可信证书存储在指定 secret 中的 X.509 格式。
8
目标 Kafka 集群的集群别名。
9
目标 Kafka 集群的身份验证配置方式与源 Kafka 集群相同。
10
连接到目标 Kafka 集群的 bootstrap 地址。该地址采用 < cluster_name>-kafka-bootstrap:<port_number> 格式。Kafka 集群不需要由 Streams for Apache Kafka 管理,或部署到 Kubernetes 集群。
11
Kafka Connect 配置。标准 Apache Kafka 配置可能会提供,仅限于不直接由 Apache Kafka 的 Streams 管理的属性。
12
目标 Kafka 集群的 TLS 加密配置方式与源 Kafka 集群相同。
13
MirrorMaker 2 连接器。
14
MirrorMaker 2 连接器使用的源集群的集群别名。
15
MirrorMaker 2 连接器使用的目标集群的集群别名。
16
定义为以逗号分隔的列表或正则表达式模式的源集群的主题复制。源连接器复制指定的主题。checkpoint 连接器跟踪指定主题的偏移。此处我们按名称请求三个主题。
17
从以逗号分隔列表或正则表达式模式定义的源集群的消费者组复制。checkpoint 连接器复制指定的消费者组。此处我们按名称请求三个消费者组。
18
MirrorSourceConnector 的配置,用于创建远程主题。配置会覆盖 默认配置选项。
19
连接器可创建的最大任务数量。任务处理数据复制并并行运行。如果基础架构支持处理开销,增加这个值可以提高吞吐量。Kafka Connect 在集群成员间分发任务。如果任务数量超过 worker,则 worker 会被分配多个任务。对于 sink 连接器,旨在为每个主题分区消耗一个任务。对于源连接器,可以并行运行的任务数量也可能依赖于外部系统。如果无法实现并行性,则连接器会创建少于最大任务数。
20
启用自动重启失败的连接器和任务。默认情况下,重启数量是无限的,但您可以使用 maxRestarts 属性设置自动重启次数的最大值。
21
在目标集群中创建的镜像主题的复制因素。
22
MirrorSourceConnector offset-syncs 内部主题的复制因素,用于映射源和目标集群的偏移。
23
启用 ACL 规则同步后,会将 ACL 应用到同步主题。默认值是 true。此功能与 User Operator 不兼容。如果使用 User Operator,请将此属性设置为 false
24
可选设置,用于更改新主题的检查频率。默认值为每 10 分钟进行一次检查。
25
添加可覆盖远程主题自动重命名的策略。该主题不会用源集群的名称来附加名称,而是保留其原始名称。此可选设置可用于主动/被动备份和数据迁移。必须为所有连接器指定属性。对于双向(active/active)复制,请使用 DefaultReplicationPolicy 类自动重命名远程主题,并为所有连接器指定 replication.policy.separator 属性来添加自定义分隔符。
26
执行连接检查的 MirrorHeartbeatConnector 的配置。配置会覆盖 默认配置选项。
27
在目标集群中创建的心跳主题的复制因素。
28
用于跟踪偏移的 MirrorCheckpointConnector 的配置。配置会覆盖 默认配置选项。
29
在目标集群中创建的检查点主题的复制因素。
30
可选设置,用于更改新消费者组的检查频率。默认值为每 10 分钟进行一次检查。
31
用于同步消费者组偏移的可选设置,这对于在主动/被动配置中恢复非常有用。默认不启用同步。
32
如果启用了使用者组偏移的同步,您可以调整同步的频率。
33
调整检查偏移跟踪的频率。如果您更改了偏移同步的频率,您可能需要调整这些检查的频率。
34
Kafka Connect 和 MirrorMaker 2 版本,它们始终相同。
35
为保留支持的资源(当前 cpumemory )的请求,以及指定可消耗的最大资源的限制。
36
指定 Kafka Connect 日志记录器和日志级别直接(内联)或通过 ConfigMap 间接(外部)。自定义 Log4j 配置必须放在 ConfigMap 中的 log4j.propertieslog4j2.properties 键下。对于 Kafka Connect log4j.rootLogger 日志记录器,您可以将日志级别设置为 INFO, ERROR, WARN, TRACE, DEBUG,FATAL 或 OFF。
37
检查检查以了解何时重启容器(存活度)以及何时容器可以接受流量(就绪度)。
38
JVM 配置选项,用于优化运行 Kafka MirrorMaker 的虚拟机(VM)的性能。
39
ProductShortName OPTION: 容器镜像配置,仅在特殊情况下推荐使用。
40
SPECIALIZED OPTION:部署的机架感知配置。这是用于在同一位置(而非跨地区)部署的专用选项。如果您希望连接器从最接近的副本而不是领导副本使用,则使用此选项。在某些情况下,使用来自最接近的副本的消耗可以提高网络利用率或降低成本。topologyKey 必须与包含机架 ID 的节点标签匹配。此配置中使用的示例使用标准 topology.kubernetes.io/zone 标签指定区。要从最接近的副本使用,请在 Kafka 代理配置中启用 RackAwareReplicaSelector
41
模板自定义。此处的 pod 使用反关联性调度,因此 pod 不会调度到具有相同主机名的节点。
42
为分布式追踪设置环境变量。
43
使用 OpenTelemetry 启用分布式追踪。

9.9.1. 配置主动/主动或主动/被动模式

您可以在主动/被动主动/主动集群配置中使用 MirrorMaker 2。

主动/主动集群配置
主动/主动配置有两个主动集群双向复制数据。应用程序可以使用任一集群。每个集群都可以提供相同的数据。这样,您可以在不同的地理位置提供相同的数据。因为消费者组在两个集群中都活跃,复制主题的使用者偏移不会重新同步到源集群。
主动/被动集群配置
主动/被动配置具有主动集群将数据复制到被动集群。被动集群保持在待机状态。在出现系统失败时,您可以使用被动集群进行数据恢复。

预期的结构是,生成者和消费者仅连接到活跃集群。每个目标目的地都需要一个 MirrorMaker 2 集群。

9.9.1.1. 双向复制(主动/主动)

MirrorMaker 2 架构支持 主动/主动集群配置中 的双向复制。

每个集群使用 sourceremote 主题的概念复制其他集群的数据。由于同一主题存储在每个集群中,因此远程主题由 MirrorMaker 2 自动重命名,以代表源集群。原始集群的名称前面是主题名称的前面。

图 9.1. 主题重命名

MirrorMaker 2 双向架构

通过标记原始集群,主题不会复制到该集群。

在配置需要数据聚合的架构时,通过 远程主题 复制的概念非常有用。消费者可以订阅同一集群中的源和目标主题,而无需单独的聚合集群。

9.9.1.2. 单向复制(主动/被动)

MirrorMaker 2 架构支持 主动/被动集群 配置中的单向复制。

您可以使用 主动/被动集群 配置来备份或将数据迁移到另一个集群。在这种情况下,您可能不希望自动重命名远程主题。

您可以通过将 IdentityReplicationPolicy 添加到源连接器配置来覆盖自动重命名。应用此配置后,主题会保留其原始名称。

9.9.2. 为多个实例配置 MirrorMaker 2

默认情况下,Apache Kafka 的 Streams 配置 MirrorMaker 2 运行的 Kafka Connect 框架使用的内部主题的组 ID 和名称。当运行多个 MirrorMaker 2 实例时,它们共享相同的 connectCluster 值,您必须使用以下配置属性更改 这些默认设置

Copy to Clipboard Toggle word wrap
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
  name: my-mirror-maker2
spec:
  connectCluster: "my-cluster-target"
  clusters:
  - alias: "my-cluster-target"
    config:
      group.id: my-connect-cluster 
1

      offset.storage.topic: my-connect-cluster-offsets 
2

      config.storage.topic: my-connect-cluster-configs 
3

      status.storage.topic: my-connect-cluster-status 
4

      # ...
    # ...
1
Kafka 中的 Kafka Connect 集群组 ID。
2
存储连接器偏移的 Kafka 主题。
3
存储连接器和任务状态配置的 Kafka 主题。
4
存储连接器和任务状态更新的 Kafka 主题。
注意

对于具有相同 group.id 的所有实例,三个主题的值必须相同。

connectCluster 设置指定 Kafka Connect 用于其内部主题的目标 Kafka 集群的别名。因此,对 connectCluster、组 ID 和内部主题命名配置进行修改特定于目标 Kafka 集群。如果两个 MirrorMaker 2 实例使用相同的源 Kafka 集群或主动-主动模式,则不需要进行更改,其中每个 MirrorMaker 2 实例都有不同的 connectCluster 设置和目标集群。

但是,如果多个 MirrorMaker 2 实例共享相同的 connectCluster,则每个连接到同一目标 Kafka 集群的实例都会使用相同的值部署。在实践中,这意味着所有实例组成一个集群并使用相同的内部主题。

尝试使用同一内部主题的多个实例将导致意外错误,因此您必须更改每个实例的这些属性值。

9.9.3. 配置 MirrorMaker 2 连接器

将 MirrorMaker 2 连接器配置用于编配 Kafka 集群之间的数据同步的内部连接器。

MirrorMaker 2 由以下连接器组成:

MirrorSourceConnector
源连接器将主题从源集群复制到目标集群。它还复制 ACL,且是 MirrorCheckpointConnector 才能运行所必需的。
MirrorCheckpointConnector
checkpoint 连接器会定期跟踪偏移。如果启用,它还在源和目标集群之间同步消费者组偏移。
MirrorHeartbeatConnector
heartbeat 连接器会定期检查源和目标集群之间的连接。

下表描述了连接器属性以及您配置为使用它们的连接器。

表 9.3. MirrorMaker 2 连接器配置属性
属性sourceConnectorcheckpointConnectorheartbeatConnector
admin.timeout.ms
管理任务的超时,如检测新主题。默认值为 60000 (1 分钟)。

replication.policy.class
定义远程主题命名约定的策略。默认为 org.apache.kafka.connect.mirror.DefaultReplicationPolicy

replication.policy.separator
在目标集群中用于主题命名的分隔符。默认情况下,分隔符设置为点(.)。分隔符配置仅适用于 DefaultReplicationPolicy 复制策略类,用于定义远程主题名称。IdentityReplicationPolicy 类不使用属性,因为主题会保留其原始名称。

consumer.poll.timeout.ms
轮询源集群时超时。默认值为 1000 (1 秒)。

 
offset-syncs.topic.location
offset-syncs 主题的位置,可以是 (默认)或 目标集群

 
topic.filter.class
选择要复制的主题的主题过滤器。默认为 org.apache.kafka.connect.mirror.DefaultTopicFilter

 
config.property.filter.class
用于选择要复制的主题配置属性的主题过滤器。默认为 org.apache.kafka.connect.mirror.DefaultConfigPropertyFilter

  
config.properties.exclude
不应复制的主题配置属性。支持以逗号分隔的属性名称和正则表达式。

  
offset.lag.max
在同步远程分区前,最大允许(不同步)偏移滞后。默认值为 100

  
offset-syncs.topic.replication.factor
内部 offset-syncs 主题的复制因素。默认值为 3

  
refresh.topics.enabled
启用检查新主题和分区。默认为 true

  
refresh.topics.interval.seconds
主题刷新的频率。默认为 600 (10 分钟)。默认情况下,检查源集群中的新主题每 10 分钟进行一次。您可以通过在源连接器配置中添加 refresh.topics.interval.seconds 来更改频率。

  
replication.factor
新主题的复制因素。默认值为 2

  
sync.topic.acls.enabled
启用从源集群同步 ACL。默认为 true。如需更多信息,请参阅 第 9.9.6 节 “为远程主题同步 ACL 规则”

  
sync.topic.acls.interval.seconds
ACL 同步的频率。默认为 600 (10 分钟)。

  
sync.topic.configs.enabled
启用从源集群同步主题配置。默认为 true

  
sync.topic.configs.interval.seconds
主题配置同步的频率。默认 600 (10 分钟)。

  
checkpoints.topic.replication.factor
内部 检查点 主题的复制因素。默认值为 3
 

 
emit.checkpoints.enabled
启用将消费者偏移同步到目标集群。默认为 true
 

 
emit.checkpoints.interval.seconds
消费者偏移同步的频率。默认值为 60 (1 分钟)。
 

 
group.filter.class
组过滤器,以选择要复制的消费者组。默认为 org.apache.kafka.connect.mirror.DefaultGroupFilter
 

 
refresh.groups.enabled
启用检查新的消费者组。默认为 true
 

 
refresh.groups.interval.seconds
消费者组刷新的频率。默认为 600 (10 分钟)。
 

 
sync.group.offsets.enabled
启用将消费者组偏移同步到目标集群 __consumer_offsets 主题。默认为 false
 

 
sync.group.offsets.interval.seconds
消费者组偏移同步的频率。默认值为 60 (1 分钟)。
 

 
emit.heartbeats.enabled
在目标集群中启用连接检查。默认为 true
  

emit.heartbeats.interval.seconds
连接检查的频率。默认为 1 ( 1 秒)。
  

heartbeats.topic.replication.factor
内部 心跳 主题的复制因素。默认值为 3
  

9.9.3.1. 更改消费者组偏移主题的位置

MirrorMaker 2 使用内部主题跟踪消费者组的偏移。

offset-syncs 主题
offset-syncs 主题映射复制主题元数据的源和目标偏移。
checkpoints 主题
checkpoints 主题映射源和目标集群中每个消费者组中复制的主题分区的最后提交偏移量。

因为它们被 MirrorMaker 2 内部使用,所以您不会直接与这些主题交互。

MirrorCheckpointConnector 为偏移跟踪发出 检查点checkpoints 主题的偏移通过配置以预先确定的间隔进行跟踪。这两个主题都允许从故障转移上的正确偏移位置完全恢复复制。

offset-syncs 主题的位置是 源集群。您可以使用 offset-syncs.topic.location 连接器配置将其更改为 目标集群。您需要对包含该主题的集群进行读/写访问。使用目标集群作为 offset-syncs 主题的位置,您也可以使用 MirrorMaker 2,即使您只有对源集群的读访问权限。

9.9.3.2. 同步消费者组偏移

__consumer_offsets 主题存储各个消费者组的提交偏移信息。偏移同步会定期将源集群的消费者组的使用者偏移转移到目标集群的使用者偏移量中。

偏移同步在主动/被动配置中特别有用。如果主动集群停机,消费者应用程序可以切换到被动(standby)集群,并从最后一个传输的偏移位置获取。

要使用主题偏移同步,请通过将 sync.group.offsets.enabled 添加到检查点连接器配置来启用同步,并将属性设置为 true。默认情况下禁用同步。

在源连接器中使用 IdentityReplicationPolicy 时,还必须在检查点连接器配置中进行配置。这样可确保为正确的主题应用镜像的消费者偏移。

消费者偏移仅针对目标集群中未激活的消费者组同步。如果消费者组位于目标集群中,则无法执行同步,并返回 UNKNOWN_MEMBER_ID 错误。

如果启用,则会定期从源集群同步偏移。您可以通过在检查点连接器配置中添加 sync.group.offsets.interval.secondsemit.checkpoints.interval.seconds 来更改频率。属性指定同步消费者组偏移的频率,以及为偏移跟踪发送检查点的频率。这两个属性的默认值为 60 秒。您还可以使用 refresh.groups.interval.seconds 属性更改检查新消费者组的频率,该属性默认为每 10 分钟执行。

由于同步基于时间,因此消费者到被动集群的任何切换都可能会导致一些消息重复。

注意

如果您有使用 Java 编写的应用程序,您可以使用 RemoteClusterUtils.java 工具通过应用同步偏移。实用程序从 checkpoints 主题获取消费者组的远程偏移。

9.9.3.3. 决定使用 heartbeat 连接器的时间

heartbeat 连接器发出心跳来检查源和目标 Kafka 集群之间的连接。内部 心跳 主题从源集群复制,这意味着 heartbeat 连接器必须连接到源集群。heartbeat 主题位于目标集群上,它允许它执行以下操作:

  • 识别它要从中镜像数据的所有源集群
  • 验证镜像进程的存活度和延迟

这有助于确保进程不会因为任何原因而卡住或已停止。虽然 heartbeat 连接器是监控 Kafka 集群之间的镜像进程的有价值的工具,但并非总是需要使用它。例如,如果您的部署具有低网络延迟或少量主题,您可能需要使用日志消息或其他监控工具来监控镜像过程。如果您决定不使用 heartbeat 连接器,只需从 MirrorMaker 2 配置中省略它。

9.9.3.4. 对齐 MirrorMaker 2 连接器的配置

为确保 MirrorMaker 2 连接器正常工作,请确保在连接器之间保持一致某些配置设置。具体来说,请确保以下属性在所有适用的连接器中具有相同的值:

  • replication.policy.class
  • replication.policy.separator
  • offset-syncs.topic.location
  • topic.filter.class

例如,source、检查点和 heartbeat 连接器的 replication.policy.class 的值必须相同。不匹配或缺失的设置会导致数据复制或偏移同步出现问题,因此必须使用同一设置保持所有相关连接器配置。

9.9.4. 配置 MirrorMaker 2 连接器制作者和消费者

MirrorMaker 2 连接器使用内部生产者和消费者。如果需要,您可以配置这些制作者和消费者来覆盖默认设置。

例如,您可以增加源制作者的 batch.size,将主题发送到目标 Kafka 集群,以更好地容纳大量信息。

重要

生产者和消费者配置选项取决于 MirrorMaker 2 的实施,并可能随时更改。

下表描述了每个连接器的生产者和消费者,以及您可以添加配置的位置。

表 9.4. 源连接器生成者和消费者
类型Description配置

制作者

将主题信息发送到目标 Kafka 集群。在处理大量数据时,请考虑调整此制作者的配置。

mirrors.sourceConnector.config: producer.override prerequisites

制作者

写入 offset-syncs 主题,它将映射复制主题分区的源和目标偏移。

mirrors.sourceConnector.config: producer.*

消费者

从源 Kafka 集群检索主题信息。

mirrors.sourceConnector.config: consumer.*

表 9.5. Checkpoint 连接器生成者和消费者
类型Description配置

制作者

发送消费者偏移检查点。

mirrors.checkpointConnector.config: producer.override prerequisites

消费者

加载 offset-syncs 主题。

mirrors.checkpointConnector.config: consumer.*

注意

您可以将 offset-syncs.topic.location 设置为 target 来使用目标 Kafka 集群作为 offset-syncs 主题的位置。

表 9.6. heartbeat 连接器制作者
类型Description配置

制作者

发送心跳。

mirrors.heartbeatConnector.config: producer.override prerequisites

以下示例演示了如何配置制作者和消费者。

连接器制作者和消费者的配置示例

Copy to Clipboard Toggle word wrap
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
  name: my-mirror-maker2
spec:
  version: 3.8.0
  # ...
  mirrors:
  - sourceCluster: "my-cluster-source"
    targetCluster: "my-cluster-target"
    sourceConnector:
      tasksMax: 5
      config:
        producer.override.batch.size: 327680
        producer.override.linger.ms: 100
        producer.request.timeout.ms: 30000
        consumer.fetch.max.bytes: 52428800
        # ...
    checkpointConnector:
      config:
        producer.override.request.timeout.ms: 30000
        consumer.max.poll.interval.ms: 300000
        # ...
    heartbeatConnector:
      config:
        producer.override.request.timeout.ms: 30000
        # ...

9.9.5. 指定最大数据复制任务数

连接器创建负责在 Kafka 中移动数据的任务。每个连接器由一个或多个任务组成,它们分布到运行任务的一组 worker pod 中。在复制大量分区或同步大量消费者组的偏移时,增加任务数量可以帮助解决性能问题。

任务并行运行。为 worker 分配一个或多个任务。单个任务由一个 worker pod 处理,因此您不需要多个 worker pod 超过任务。如果有多个任务,worker 会处理多个任务。

您可以使用 tasksMax 属性指定 MirrorMaker 配置中的最大连接器任务数量。在不指定最大任务数量的情况下,默认设置是单个任务。

heartbeat 连接器始终使用单个任务。

为源和检查点连接器启动的任务数量是最大可能任务数和 tasksMax 的值之间的较低值。对于源连接器,可能的最大任务数是从源集群复制的每个分区。对于检查点连接器,可能的最大任务数都是从源集群复制的每个消费者组。在设置最多任务数量时,请考虑分区数量和支持进程的硬件资源。

如果基础架构支持处理开销,增加任务数量可以提高吞吐量和延迟。例如,添加更多任务可减少在有大量分区或消费者组时轮询源集群所需的时间。

当您有大量分区时,为源连接器增加任务数量很有用。

为源连接器增加任务数量

Copy to Clipboard Toggle word wrap
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
  name: my-mirror-maker2
spec:
  # ...
  mirrors:
  - sourceCluster: "my-cluster-source"
    targetCluster: "my-cluster-target"
    sourceConnector:
      tasksMax: 10
  # ...

当您有大量消费者组时,为检查点连接器增加任务数量很有用。

增加检查点连接器的任务数量

Copy to Clipboard Toggle word wrap
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
  name: my-mirror-maker2
spec:
  # ...
  mirrors:
  - sourceCluster: "my-cluster-source"
    targetCluster: "my-cluster-target"
    checkpointConnector:
      tasksMax: 10
  # ...

默认情况下,MirrorMaker 2 每 10 分钟检查新消费者组。您可以调整 refresh.groups.interval.seconds 配置以更改频率。在调整降低时请小心。更频繁的检查可能会对性能造成负面影响。

9.9.5.1. 检查连接器任务操作

如果使用 Prometheus 和 Grafana 监控部署,您可以检查 MirrorMaker 2 性能。由 Apache Kafka 的 Streams 提供的 MirrorMaker 2 Grafana 仪表板示例显示了以下与任务和延迟相关的指标。

  • 任务数量
  • 复制延迟
  • 偏移同步延迟

其他资源

9.9.6. 为远程主题同步 ACL 规则

将 MirrorMaker 2 与 Apache Kafka 的 Streams 搭配使用时,可以同步远程主题的 ACL 规则。但是,只有在您没有使用 User Operator 时,此功能才可用。

如果您使用 类型:在没有 User Operator 的情况下进行简单 授权,管理对代理的访问的 ACL 规则也适用于远程主题。这意味着,对源主题具有读取访问权限的用户也可以读取其远程等效内容。

注意

OAuth 2.0 授权不支持以这种方式访问远程主题。

9.9.7. 保护 Kafka MirrorMaker 2 部署

此流程描述了概述保护 MirrorMaker 2 部署所需的配置。

源 Kafka 集群和目标 Kafka 集群需要单独的配置。您还需要单独的用户配置来提供 MirrorMaker 连接到源和目标 Kafka 集群所需的凭证。

对于 Kafka 集群,您可以为 OpenShift 集群内的安全连接指定内部监听程序,以及用于 OpenShift 集群外的连接的外部监听程序。

您可以配置身份验证和授权机制。为源和目标 Kafka 集群实施的安全选项必须与为 MirrorMaker 2 实施的安全选项兼容。

创建集群和用户身份验证凭证后,您可以在 MirrorMaker 配置中指定它们以进行安全连接。

注意

在此过程中,会使用 Cluster Operator 生成的证书,但 您可以通过安装自己的证书 来替换它们。您还可以将侦听器 配置为使用由外部 CA (证书颁发机构)管理的 Kafka 侦听器证书

开始前

在开始这个过程前,请查看 Streams for Apache Kafka 提供 的示例配置文件。它们包括使用 mTLS 或 SCRAM-SHA-512 身份验证保护 MirrorMaker 2 部署的示例。示例指定用于在 OpenShift 集群内连接的内部监听程序。

这个示例还为完整的授权提供配置,包括允许用户在源和目标 Kafka 集群上操作的 ACL。

在配置对源和目标 Kafka 集群的用户访问时,ACL 必须授予对内部 MirrorMaker 2 连接器和目标集群组和目标集群中底层 Kafka Connect 框架使用的内部主题的访问权限。如果您重命名了集群组或内部主题,比如 为多个实例配置 MirrorMaker 2 时,请在 ACL 配置中使用这些名称。

简单授权使用由 Kafka AclAuthorizerStandardAuthorizer 插件管理的 ACL 规则来确保适当的访问级别。有关将 KafkaUser 资源配置为使用简单授权的更多信息,请参阅 AclRule 模式参考

先决条件

  • Apache Kafka 的流正在运行
  • 源和目标集群的独立命名空间

此流程假设将源和目标集群安装到单独的命名空间。如果要使用 Topic Operator,则需要执行此操作。主题 Operator 只监视指定命名空间中的单个集群。

通过将集群划分为命名空间,您需要复制集群 secret,以便可以在命名空间外访问它们。您需要引用 MirrorMaker 配置中的 secret。

流程

  1. 配置两个 Kafka 资源,一个用于保护源 Kafka 集群,另一个用于保护目标 Kafka 集群。

    您可以为身份验证和启用授权添加监听程序配置。

    在本例中,为带有 TLS 加密和 mTLS 身份验证的 Kafka 集群配置了内部监听程序。启用 Kafka 简单 授权。

    使用 TLS 加密和 mTLS 身份验证的源 Kafka 集群配置示例

    Copy to Clipboard Toggle word wrap
    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-source-cluster
    spec:
      kafka:
        version: 3.8.0
        replicas: 1
        listeners:
          - name: tls
            port: 9093
            type: internal
            tls: true
            authentication:
              type: tls
        authorization:
          type: simple
        config:
          offsets.topic.replication.factor: 1
          transaction.state.log.replication.factor: 1
          transaction.state.log.min.isr: 1
          default.replication.factor: 1
          min.insync.replicas: 1
          inter.broker.protocol.version: "3.8"
        storage:
          type: jbod
          volumes:
          - id: 0
            type: persistent-claim
            size: 100Gi
            deleteClaim: false
      zookeeper:
        replicas: 1
        storage:
          type: persistent-claim
          size: 100Gi
          deleteClaim: false
      entityOperator:
        topicOperator: {}
        userOperator: {}

    使用 TLS 加密和 mTLS 身份验证的目标 Kafka 集群配置示例

    Copy to Clipboard Toggle word wrap
    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-target-cluster
    spec:
      kafka:
        version: 3.8.0
        replicas: 1
        listeners:
          - name: tls
            port: 9093
            type: internal
            tls: true
            authentication:
              type: tls
        authorization:
          type: simple
        config:
          offsets.topic.replication.factor: 1
          transaction.state.log.replication.factor: 1
          transaction.state.log.min.isr: 1
          default.replication.factor: 1
          min.insync.replicas: 1
          inter.broker.protocol.version: "3.8"
        storage:
          type: jbod
          volumes:
            - id: 0
              type: persistent-claim
              size: 100Gi
              deleteClaim: false
      zookeeper:
        replicas: 1
        storage:
          type: persistent-claim
          size: 100Gi
          deleteClaim: false
      entityOperator:
        topicOperator: {}
        userOperator: {}

  2. 在单独的命名空间中创建或更新 Kafka 资源。

    Copy to Clipboard Toggle word wrap
    oc apply -f <kafka_configuration_file> -n <namespace>

    Cluster Operator 创建监听程序并设置集群和客户端证书颁发机构(CA)证书,以便在 Kafka 集群中启用身份验证。

    证书在 secret < cluster_name> -cluster-ca-cert 中创建。

  3. 配置两个 KafkaUser 资源,一个用于源 Kafka 集群的用户,另一个用于目标 Kafka 集群的用户。

    1. 配置与对应的源和目标 Kafka 集群相同的身份验证和授权类型。例如,如果您在源 Kafka 集群的 Kafka 配置中使用了 tls 验证和 simple 授权类型,请在 KafkaUser 配置中使用相同的。
    2. 配置 MirrorMaker 2 所需的 ACL,以允许对源和目标 Kafka 集群执行操作。

    mTLS 验证的源用户配置示例

    Copy to Clipboard Toggle word wrap
    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaUser
    metadata:
      name: my-source-user
      labels:
        strimzi.io/cluster: my-source-cluster
    spec:
      authentication:
        type: tls
      authorization:
        type: simple
        acls:
          # MirrorSourceConnector
          - resource: # Not needed if offset-syncs.topic.location=target
              type: topic
              name: mm2-offset-syncs.my-target-cluster.internal
            operations:
              - Create
              - DescribeConfigs
              - Read
              - Write
          - resource: # Needed for every topic which is mirrored
              type: topic
              name: "*"
            operations:
              - DescribeConfigs
              - Read
          # MirrorCheckpointConnector
          - resource:
              type: cluster
            operations:
              - Describe
          - resource: # Needed for every group for which offsets are synced
              type: group
              name: "*"
            operations:
              - Describe
          - resource: # Not needed if offset-syncs.topic.location=target
              type: topic
              name: mm2-offset-syncs.my-target-cluster.internal
            operations:
              - Read

    mTLS 验证的目标用户配置示例

    Copy to Clipboard Toggle word wrap
    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaUser
    metadata:
      name: my-target-user
      labels:
        strimzi.io/cluster: my-target-cluster
    spec:
      authentication:
        type: tls
      authorization:
        type: simple
        acls:
          # cluster group
          - resource:
              type: group
              name: mirrormaker2-cluster
            operations:
              - Read
          # access to config.storage.topic
          - resource:
              type: topic
              name: mirrormaker2-cluster-configs
            operations:
              - Create
              - Describe
              - DescribeConfigs
              - Read
              - Write
          # access to status.storage.topic
          - resource:
              type: topic
              name: mirrormaker2-cluster-status
            operations:
              - Create
              - Describe
              - DescribeConfigs
              - Read
              - Write
          # access to offset.storage.topic
          - resource:
              type: topic
              name: mirrormaker2-cluster-offsets
            operations:
              - Create
              - Describe
              - DescribeConfigs
              - Read
              - Write
          # MirrorSourceConnector
          - resource: # Needed for every topic which is mirrored
              type: topic
              name: "*"
            operations:
              - Create
              - Alter
              - AlterConfigs
              - Write
          # MirrorCheckpointConnector
          - resource:
              type: cluster
            operations:
              - Describe
          - resource:
              type: topic
              name: my-source-cluster.checkpoints.internal
            operations:
              - Create
              - Describe
              - Read
              - Write
          - resource: # Needed for every group for which the offset is synced
              type: group
              name: "*"
            operations:
              - Read
              - Describe
          # MirrorHeartbeatConnector
          - resource:
              type: topic
              name: heartbeats
            operations:
              - Create
              - Describe
              - Write

    注意

    您可以通过将 type 设置为 tls-external 来使用 User Operator 外部发布的证书。如需更多信息,请参阅 KafkaUserSpec 模式参考

  4. 在您为源和目标 Kafka 集群创建的每个命名空间中创建或更新 KafkaUser 资源。

    Copy to Clipboard Toggle word wrap
    oc apply -f <kafka_user_configuration_file> -n <namespace>

    User Operator 根据所选的验证类型创建代表客户端(MirrorMaker)的用户,以及用于客户端身份验证的安全凭证。

    User Operator 创建一个名称与 KafkaUser 资源相同的新 secret。secret 包含 mTLS 验证的私钥和公钥。公钥包含在用户证书中,该证书由客户端 CA 签名。

  5. 使用身份验证详情配置 KafkaMirrorMaker2 资源,以连接到源和目标 Kafka 集群。

    带有 TLS 加密和 mTLS 身份验证的 MirrorMaker 2 配置示例

    Copy to Clipboard Toggle word wrap
    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaMirrorMaker2
    metadata:
      name: my-mirror-maker-2
    spec:
      version: 3.8.0
      replicas: 1
      connectCluster: "my-target-cluster"
      clusters:
        - alias: "my-source-cluster"
          bootstrapServers: my-source-cluster-kafka-bootstrap:9093
          tls: 
    1
    
            trustedCertificates:
              - secretName: my-source-cluster-cluster-ca-cert
                pattern: "*.crt"
          authentication: 
    2
    
            type: tls
            certificateAndKey:
              secretName: my-source-user
              certificate: user.crt
              key: user.key
        - alias: "my-target-cluster"
          bootstrapServers: my-target-cluster-kafka-bootstrap:9093
          tls: 
    3
    
            trustedCertificates:
              - secretName: my-target-cluster-cluster-ca-cert
                pattern: "*.crt"
          authentication: 
    4
    
            type: tls
            certificateAndKey:
              secretName: my-target-user
              certificate: user.crt
              key: user.key
          config:
            # -1 means it will use the default replication factor configured in the broker
            config.storage.replication.factor: -1
            offset.storage.replication.factor: -1
            status.storage.replication.factor: -1
      mirrors:
        - sourceCluster: "my-source-cluster"
          targetCluster: "my-target-cluster"
          sourceConnector:
            config:
              replication.factor: 1
              offset-syncs.topic.replication.factor: 1
              sync.topic.acls.enabled: "false"
          heartbeatConnector:
            config:
              heartbeats.topic.replication.factor: 1
          checkpointConnector:
            config:
              checkpoints.topic.replication.factor: 1
              sync.group.offsets.enabled: "true"
          topicsPattern: "topic1|topic2|topic3"
          groupsPattern: "group1|group2|group3"

    1
    源 Kafka 集群的 TLS 证书。如果它们位于单独的命名空间中,请将集群 secret 从 Kafka 集群的命名空间中复制。
    2
    使用 TLS 机制访问源 Kafka 集群的用户身份验证。
    3
    目标 Kafka 集群的 TLS 证书。
    4
    访问目标 Kafka 集群的用户身份验证。
  6. 在与目标 Kafka 集群相同的命名空间中创建或更新 KafkaMirrorMaker2 资源。

    Copy to Clipboard Toggle word wrap
    oc apply -f <mirrormaker2_configuration_file> -n <namespace_of_target_cluster>

9.9.8. 手动停止或暂停 MirrorMaker 2 连接器

如果您使用 KafkaMirrorMaker2 资源来配置内部 MirrorMaker 连接器,请使用 state 配置来停止或暂停连接器。与连接器和任务保持实例化的暂停状态不同,停止连接器只保留配置,且没有活跃进程。从运行停止连接器可能更适合长时间运行,而不是只暂停。虽然暂停的连接器速度更快恢复,但已停止的连接器具有释放内存和资源的优点。

注意

state 配置替换 KafkaMirrorMaker2ConnectorSpec 模式中的(已弃用) pause 配置,它允许暂停连接器。如果您之前使用 pause 配置来暂停连接器,我们建议您只使用 state 配置过渡到 以避免冲突。

先决条件

  • Cluster Operator 正在运行。

流程

  1. 查找控制要暂停或停止 MirrorMaker 2 连接器的 KafkaMirrorMaker2 自定义资源的名称:

    Copy to Clipboard Toggle word wrap
    oc get KafkaMirrorMaker2
  2. 编辑 KafkaMirrorMaker2 资源,以停止或暂停连接器。

    停止 MirrorMaker 2 连接器的配置示例

    Copy to Clipboard Toggle word wrap
    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaMirrorMaker2
    metadata:
      name: my-mirror-maker2
    spec:
      version: 3.8.0
      replicas: 3
      connectCluster: "my-cluster-target"
      clusters:
        # ...
      mirrors:
      - sourceCluster: "my-cluster-source"
        targetCluster: "my-cluster-target"
        sourceConnector:
          tasksMax: 10
          autoRestart:
            enabled: true
          state: stopped
      # ...

    state 配置更改为 stoppedpaused。当此属性没有设置时,连接器的默认状态为 running

  3. 将更改应用到 KafkaMirrorMaker2 配置。

    您可以通过将 state 改为 running,或删除配置来恢复连接器。

注意

另外,您可以 公开 Kafka Connect API,并使用 stoppause 端点停止连接器运行。例如,PUT /connectors/<connector_name>/stop。然后,您可以使用 resume 端点重启它。

9.9.9. 手动重启 MirrorMaker 2 连接器

使用 strimzi.io/restart-connector 注解来手动触发 MirrorMaker 2 连接器的重启。

先决条件

  • Cluster Operator 正在运行。

流程

  1. 查找控制您要重启的 Kafka MirrorMaker 2 连接器的 KafkaMirrorMaker2 自定义资源的名称:

    Copy to Clipboard Toggle word wrap
    oc get KafkaMirrorMaker2
  2. 查找要从 KafkaMirrorMaker2 自定义资源重启的 Kafka MirrorMaker 2 连接器的名称:

    Copy to Clipboard Toggle word wrap
    oc describe KafkaMirrorMaker2 <mirrormaker_cluster_name>
  3. 通过在 OpenShift 中注解 KafkaMirrorMaker2 资源,使用连接器的名称来重启连接器:

    Copy to Clipboard Toggle word wrap
    oc annotate KafkaMirrorMaker2 <mirrormaker_cluster_name> "strimzi.io/restart-connector=<mirrormaker_connector_name>"

    在本例中,my-mirror-maker-2 集群中的连接器 my-connector 被重启:

    Copy to Clipboard Toggle word wrap
    oc annotate KafkaMirrorMaker2 my-mirror-maker-2 "strimzi.io/restart-connector=my-connector"
  4. 等待下一个协调发生(默认为两分钟)。

    MirrorMaker 2 连接器会重启,只要协调过程检测到注解。当 MirrorMaker 2 接受请求时,注解会从 KafkaMirrorMaker2 自定义资源中删除。

9.9.10. 手动重启 MirrorMaker 2 连接器任务

使用 strimzi.io/restart-connector-task 注解来手动触发 MirrorMaker 2 连接器的重启。

先决条件

  • Cluster Operator 正在运行。

流程

  1. 查找控制您要重启的 MirrorMaker 2 连接器任务的 KafkaMirrorMaker2 自定义资源的名称:

    Copy to Clipboard Toggle word wrap
    oc get KafkaMirrorMaker2
  2. 查找连接器的名称和要从 KafkaMirrorMaker2 自定义资源重启的任务 ID:

    Copy to Clipboard Toggle word wrap
    oc describe KafkaMirrorMaker2 <mirrormaker_cluster_name>

    任务 ID 是非负整数,从 0 开始。

  3. 通过在 OpenShift 中注解 KafkaMirrorMaker2 资源,使用名称和 ID 重启连接器任务:

    Copy to Clipboard Toggle word wrap
    oc annotate KafkaMirrorMaker2 <mirrormaker_cluster_name> "strimzi.io/restart-connector-task=<mirrormaker_connector_name>:<task_id>"

    在本例中,在 my-mirror-maker-2 集群中连接器 my-connector 的任务 0 被重启:

    Copy to Clipboard Toggle word wrap
    oc annotate KafkaMirrorMaker2 my-mirror-maker-2 "strimzi.io/restart-connector-task=my-connector:0"
  4. 等待下一个协调发生(默认为两分钟)。

    MirrorMaker 2 连接器任务会重启,只要协调过程检测到注解。当 MirrorMaker 2 接受请求时,注解会从 KafkaMirrorMaker2 自定义资源中删除。

返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat, Inc.