2.5. Kafka MirrorMaker 2.0 集群配置


本节论述了如何在 AMQ Streams 集群中配置 Kafka MirrorMaker 2.0 部署。

MirrorMaker 2.0 用于在两个或多个活跃的 Kafka 集群间复制数据。

集群中的数据复制支持需要的情况:

  • 在系统失败时恢复数据
  • 聚合数据进行分析
  • 特定集群的数据访问限制
  • 在特定位置置备数据以提高延迟

如果使用 MirrorMaker 2.0,请配置 KafkaMirrorMaker2 资源。KafkaMirrorMaker2 资源的完整 schema 在 KafkaMirrorMaker2 模式中描述。

MirrorMaker 2.0 引入了全新的方法来在集群间复制数据。

因此,资源配置与之前版本的 MirrorMaker 不同。如果您选择使用 MirrorMaker 2.0,目前还没有旧的支持,因此任何资源都必须手动转换为新格式。

2.5.1. MirrorMaker 2.0 数据复制

MirrorMaker 2.0 使用来自源 Kafka 集群的信息,并将其写入目标 Kafka 集群。

MirrorMaker 2.0 使用:

  • 源集群配置使用来自源集群的数据
  • 将数据输出到目标集群的目标集群配置

MirrorMaker 2.0 基于 Kafka Connect 框架,连接器 管理集群之间的数据传输。

MirrorMaker 2.0 使用以下连接器:

MirrorSourceConnector
源连接器将主题从源集群复制到目标集群。
MirrorCheckpointConnector
检查点连接器定期跟踪偏移。如果启用,它还会在源和目标集群之间同步消费者组偏移。
MirrorHeartbeatConnector
heartbeat 连接器会定期检查源和目标集群之间的连接。

镜像 从一个集群镜像到另一个集群的过程是异步的。推荐的模式是在本地和源 Kafka 集群一起生成信息,然后远程消耗到目标 Kafka 集群。

MirrorMaker 2.0 可以和多个源集群一起使用。

图 2.1. 在两个集群间复制

默认情况下,每隔 10 分钟对源集群中的新主题进行一次检查。您可以通过在源连接器配置中添加 refresh.topics.interval.seconds 来更改频率。但是,增加操作的频率可能会影响整体性能。

2.5.2. 集群配置

您可以在主动/被动主动/主动集群配置中使用 MirrorMaker 2.0。

  • 主动/主动配置中,两个集群都处于活跃状态,并同时提供相同的数据,如果您需要在不同地理位置的本地都提供相同的数据,这个配置非常有用。
  • 主动/被动配置中,主动集群中的数据会在被动集群被复制,后者处于待机状态(例如在系统失败时进行数据恢复)。

预计生产者和消费者仅连接到活跃集群。

每个目标目的地都需要一个 MirrorMaker 2.0 集群。

2.5.2.1. 双向复制(主动/主动)

MirrorMaker 2.0 架构支持 主动/主动集群配置中 双向复制。

每个集群使用 源和目标 主题的概念复制其他 集群的数据。与每个集群中存储相同的主题,远程主题由 MirrorMaker 2.0 自动重命名来代表源集群。原始集群的名称前面是主题名称的前面。

图 2.2. 主题重命名

通过标记原始集群,主题不会重新复制到该集群。

在配置需要数据聚合的构架时,通过远程 主题复制的概念非常有用。消费者可以订阅同一集群中的源和远程主题,而无需单独聚合集群。

2.5.2.2. 单向复制(主动/被动)

MirrorMaker 2.0 架构支持 主动/被动 集群配置中的单向复制。

您可以使用 主动/被动 集群配置进行备份或将数据迁移到另一个集群。在这种情况下,您可能不希望自动重命名远程主题。

您可以通过将 IdentityReplicationPolicy 添加到源连接器配置来覆盖自动重命名。应用此配置后,主题会保留其原始名称。

2.5.2.3. 主题配置同步

主题配置在源和目标集群之间自动同步。通过同步配置属性,减少了重新平衡的需求。

2.5.2.4. 数据完整性

MirrorMaker 2.0 监控源主题,并将任何配置更改传播到远程主题,检查和创建缺失的分区。只有 MirrorMaker 2.0 才能写入到远程主题。

2.5.2.5. 偏移跟踪

MirrorMaker 2.0 使用内部主题来跟踪消费者组的偏移量。

  • offset-syncs 主题从记录元数据映射复制主题分区的源和目标偏移。
  • checkpoints 主题映射源和目标集群中每个消费者组中复制的主题分区的最后提交偏移量

MirrorCheckpointConnector emits 检查点 用于偏移跟踪。检查点 主题的偏移是通过配置预先确定的间隔跟踪。这两个主题都可让复制从故障转移上的正确偏移位置完全恢复。

默认情况下,offset-syncs 主题的位置是 源集群。您可以使用 offset-syncs.topic.location 连接器配置将其更改为 目标集群。您需要对包含主题的集群进行读/写访问。使用目标集群作为 偏移同步 主题的位置,允许您使用 MirrorMaker 2.0,即使您只对源集群具有读取访问权限。

2.5.2.6. 同步消费者组偏移

__consumer_offsets 主题存储各个消费者组的提交偏移信息。偏移同步定期将源集群的使用者偏移量传输到目标集群的使用者偏移主题。

偏移同步在 主动/被动配置 中特别有用。如果活跃集群停机,使用者应用程序可以切换到被动(被动)集群,并从最后传输的偏移位置获取。

要使用主题偏移同步,通过在检查点连接器配置中添加 sync.group.offsets.enabled 来启用同步,并将 属性设为 true。默认禁用同步。

在源连接器中使用 IdentityReplicationPolicy 时,还必须在检查点连接器配置中配置。这样可确保为正确的主题应用镜像使用者偏移量。

使用者偏移仅为目标集群中不活动的消费者组同步。如果消费者组在目标集群中,则无法执行同步,并返回 UNKNOWN_MEMBER_ID 错误。

如果启用,将定期从源集群同步偏移。您可以通过在检查点连接器配置中添加 sync.group.offsets.secondsemit.checkpoints.interval.seconds 来更改频率。属性指定消费者组偏移的频率,以及针对偏移跟踪发出的检查点的频率。两个属性的默认值都是 60 秒。您还可以使用 refresh.groups.interval.seconds 属性更改新消费者组的检查频率,该属性默认为每 10 分钟执行一次。

因为同步是基于时间的,所以消费者到被动集群的任何交换机都可能会导致消息重复。

2.5.2.7. 连接检查

MirrorHeartbeatConnector 发出 heartbeats 来检查集群间的连接。

从源集群复制内部 心跳 主题。目标集群使用 heartbeat 主题来检查以下内容:

  • 管理集群间连接的连接器正在运行
  • 源集群可用

2.5.3. 连接器配置

将 Mirrormaker 2.0 连接器配置用于内部连接器,以编配 Kafka 集群之间的数据同步。

下表描述了连接器属性和您配置的连接器。

Expand
表 2.1. MirrorMaker 2.0 连接器配置属性
属性sourceConnectorcheckpointConnectorheartbeatConnector
admin.timeout.ms
管理任务的超时,如检测新主题。默认为 60000 (1 分钟)。

replication.policy.class
定义远程主题命名约定的策略。默认为 org.apache.kafka.connect.mirror.DefaultReplicationPolicy

replication.policy.separator
目标集群中用于主题命名的分隔符。默认为 (dot)。只有在 replication.policy.classDefaultReplicationPolicy 时,它才会被使用。

consumer.poll.timeout.ms
轮询源集群时的超时。默认值为 1000 (1 秒)。

 
offset-syncs.topic.location
offset-syncs 主题的位置,可以是 (默认)或 目标集群

 
topic.filter.class
主题过滤器,以选择要复制的主题。默认为 org.apache.kafka.connect.mirror.DefaultTopicFilter

 
config.property.filter.class
主题过滤器,选择要复制的主题配置属性。默认为 org.apache.kafka.connect.mirror.DefaultConfigPropertyFilter

  
config.properties.exclude
不应复制的主题配置属性。支持以逗号分隔的属性名称和正则表达式。

  
offset.lag.max
同步远程分区前最多允许(out-of-sync)偏移偏移量。默认值为 100

  
offset-syncs.topic.replication.factor
内部偏移同步 主题的复制因素。默认为 3

  
refresh.topics.enabled
启用检查新主题和分区。默认为 true

  
refresh.topics.interval.seconds
主题刷新频率。默认为 600 (10 分钟)。

  
replication.factor
新主题的复制因素。默认为 2

  
sync.topic.acls.enabled
在源集群中启用 ACL 同步。默认为 true。与 User Operator 不兼容。

  
sync.topic.acls.interval.seconds
ACL 同步频率。默认为 600 (10 分钟)。

  
sync.topic.configs.enabled
启用源集群中的主题配置同步。默认为 true

  
sync.topic.configs.interval.seconds
主题配置同步频率。默认 600 (10 分钟)。

  
checkpoints.topic.replication.factor
内部 检查点 主题的复制因素。默认为 3
 

 
emit.checkpoints.enabled
启用将消费者偏移同步到目标集群。默认为 true
 

 
emit.checkpoints.interval.seconds
消费者偏移同步的频率。默认为 60 (1 分钟)。
 

 
group.filter.class
组过滤器,以选择要复制的使用者组。默认为 org.apache.kafka.connect.mirror.DefaultGroupFilter
 

 
refresh.groups.enabled
为新的消费者组启用检查。默认为 true
 

 
refresh.groups.interval.seconds
消费者组刷新频率。默认为 600 (10 分钟)。
 

 
sync.group.offsets.enabled
启用将消费者组偏移同步到目标集群 __consumer_offsets 主题。默认为 false
 

 
sync.group.offsets.interval.seconds
消费者组偏移同步的频率。默认为 60 (1 分钟)。
 

 
emit.heartbeats.enabled
在目标集群中启用连接检查。默认为 true
  

emit.heartbeats.interval.seconds
连接检查的频率。默认为 1 ( 1 秒)。
  

heartbeats.topic.replication.factor
内部 心跳 主题的复制因素。默认为 3
  

2.5.4. 连接器制作者和消费者配置

MirrorMaker 2.0 连接器使用内部制作者和消费者。如果需要,您可以配置这些制作者和消费者来覆盖默认设置。

例如,您可以提高源制作者的 batch.size,该制作者会向目标 Kafka 集群发送主题以更好地容纳大量消息。

重要

生产者和使用者配置选项取决于 MirrorMaker 2.0 实施,可能会有所变化。

下表描述了每个连接器以及您可在其中添加配置的制作者和消费者。

Expand
表 2.2. 源连接器制作者和使用者
类型Description配置

制作者

将主题信息发送到目标 Kafka 集群。在处理大量数据时,请考虑调优此制作者的配置。

mirrors.sourceConnector.config: producer.override.*

制作者

写入 offset-syncs 主题,用于映射复制主题分区的源和目标偏移。

mirrors.sourceConnector.config: producer.*

消费者

从源 Kafka 集群检索主题信息。

mirrors.sourceConnector.config: consumer.*

Expand
表 2.3. Checkpoint 连接器制作者和消费者
类型Description配置

制作者

发送消费者偏移检查点。

mirrors.checkpointConnector.config: producer.override.*

消费者

加载 offset-syncs 主题。

mirrors.checkpointConnector.config: consumer.*

注意

您可以将 offset-syncs.topic.location 设置为 target 来使用目标 Kafka 集群作为 offset-syncs 主题的位置。

Expand
表 2.4. heartbeat 连接器制作器
类型Description配置

制作者

发送心跳.

mirrors.heartbeatConnector.config: producer.override.*

以下示例演示了如何配置制作者和使用者。

连接器制作者和消费者配置示例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
  name: my-mirror-maker2
spec:
  version: 3.2.3
  # ...
  mirrors:
  - sourceCluster: "my-cluster-source"
    targetCluster: "my-cluster-target"
    sourceConnector:
      tasksMax: 5
      config:
        producer.override.batch.size: 327680
        producer.override.linger.ms: 100
        producer.request.timeout.ms: 30000
        consumer.fetch.max.bytes: 52428800
        # ...
    checkpointConnector:
      config:
        producer.override.request.timeout.ms: 30000
        consumer.max.poll.interval.ms: 300000
        # ...
    heartbeatConnector:
      config:
        producer.override.request.timeout.ms: 30000
        # ...
Copy to Clipboard Toggle word wrap

2.5.5. 指定最多任务数

连接器创建负责在 Kafka 中和移出数据的任务。每个连接器都包含一个或多个任务,它们分布在一组运行任务的 worker pod 中。在复制大量分区或同步大量消费者组的偏移时,增加任务数量可帮助出现性能问题。

任务并行运行。为 worker 分配一个或多个任务。单个任务由一个 worker pod 处理,因此您不需要更多的 worker pod。如果任务数量超过 worker,worker 会处理多个任务。

您可以使用 tasksMax 属性指定 mirrorMaker 配置中的最大连接器任务数量。如果不指定最多任务数,则默认设置是一个单任务。

heartbeat 连接器始终使用单个任务。

为源和检查点连接器启动的任务数量是最大可能任务和 tasksMax 的值之间的低值。对于源连接器,可以最多的任务数是从源集群复制的每个分区。对于 checkpoint 连接器,可以为每个消费者组从源集群复制的最多任务数。当设置最多任务数时,请考虑分区的数量和支持该进程的硬件资源。

如果基础架构支持处理开销,增加任务数量可提高吞吐量和延迟。例如,添加更多任务可减少在有大量分区或消费者组时轮询源集群所需的时间。

当您有大量分区时,增加检查点连接器的任务数量会很有用。

为源连接器增加任务数量

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
  name: my-mirror-maker2
spec:
  # ...
  mirrors:
  - sourceCluster: "my-cluster-source"
    targetCluster: "my-cluster-target"
    sourceConnector:
      tasksMax: 10
  # ...
Copy to Clipboard Toggle word wrap

当您有大量消费者组时,增加检查点连接器的任务数量会很有用。

增加检查点连接器的任务数量

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
  name: my-mirror-maker2
spec:
  # ...
  mirrors:
  - sourceCluster: "my-cluster-source"
    targetCluster: "my-cluster-target"
    checkpointConnector:
      tasksMax: 10
  # ...
Copy to Clipboard Toggle word wrap

默认情况下,MirrorMaker 2.0 会每 10 分钟检查新的消费者组。您可以调整 refresh.groups.interval.seconds 配置以更改频率。在调整调整时要小心。更频繁的检查会对性能造成负面影响。

2.5.5.1. 检查连接器任务操作

如果使用 Prometheus 和 Grafana 监控部署,您可以检查 MirrorMaker 2.0 性能。AMQ Streams 提供的 MirrorMaker 2.0 Grafana 仪表板示例显示了与任务和延迟相关的以下指标。

  • 任务数量
  • 复制延迟
  • 偏移同步延迟

2.5.6. ACL 规则同步

如果您不使用 User Operator,则可使用 ACL 访问远程主题。

如果使用 AclAuthorizer,但没有 User Operator,则管理对代理的访问的 ACL 规则也适用于远程主题。可以读取源主题的用户可以读取其远程等效功能。

注意

OAuth 2.0 授权不支持以这种方式访问远程主题。

2.5.7. Configuring Kafka MirrorMaker 2.0

使用 KafkaMirrorMaker2 资源的属性来配置 Kafka MirrorMaker 2.0 部署。使用 MirrorMaker 2.0 在 Kafka 集群间同步数据

配置必须指定:

  • 每个 Kafka 集群
  • 每个集群的连接信息,包括 TLS 身份验证
  • 复制流和方向

    • 集群到集群
    • 主题
注意

以前版本的 MirrorMaker 将继续被支持。如果要使用为之前的版本配置的资源,必须将其更新为 MirrorMaker 2.0 支持的格式。

MirrorMaker 2.0 为复制因素等属性提供默认配置值。最小配置(默认值保持不变)会类似以下示例:

MirrorMaker 2.0 的最小配置

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
  name: my-mirror-maker2
spec:
  version: 3.2.3
  connectCluster: "my-cluster-target"
  clusters:
  - alias: "my-cluster-source"
    bootstrapServers: my-cluster-source-kafka-bootstrap:9092
  - alias: "my-cluster-target"
    bootstrapServers: my-cluster-target-kafka-bootstrap:9092
  mirrors:
  - sourceCluster: "my-cluster-source"
    targetCluster: "my-cluster-target"
    sourceConnector: {}
Copy to Clipboard Toggle word wrap

您可以使用 TLS 或 SASL 身份验证为源和目标集群配置访问控制。此流程显示对源和目标集群使用 TLS 加密和身份验证的配置。

您可以在 KafkaMirrorMaker2 资源中指定要从源集群复制的主题和消费者组。您可以使用 topicsPatterngroupsPattern 属性进行此操作。您可以提供名称或使用正则表达式的列表。默认情况下,如果没有设置 topicsPatterngroupsPattern 属性,则会复制所有主题和消费者组。您还可以使用 ".*" 作为正则表达式来复制所有主题和消费者组。但是,尝试只指定您所需的主题和消费者组,以避免造成集群中的不必要的额外负载。

处理大量消息

您可以调整配置来处理大量消息。更多信息请参阅 第 2.7 节 “处理大量消息”

先决条件

  • AMQ Streams 处于运行状态
  • 源集群和目标 Kafka 集群可用

流程

  1. 编辑 KafkaMirrorMaker2 资源的 spec 属性。

    您可以在以下示例配置中显示您可以配置的属性:

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaMirrorMaker2
    metadata:
      name: my-mirror-maker2
    spec:
      version: 3.2.3 
    1
    
      replicas: 3 
    2
    
      connectCluster: "my-cluster-target" 
    3
    
      clusters: 
    4
    
      - alias: "my-cluster-source" 
    5
    
        authentication: 
    6
    
          certificateAndKey:
            certificate: source.crt
            key: source.key
            secretName: my-user-source
          type: tls
        bootstrapServers: my-cluster-source-kafka-bootstrap:9092 
    7
    
        tls: 
    8
    
          trustedCertificates:
          - certificate: ca.crt
            secretName: my-cluster-source-cluster-ca-cert
      - alias: "my-cluster-target" 
    9
    
        authentication: 
    10
    
          certificateAndKey:
            certificate: target.crt
            key: target.key
            secretName: my-user-target
          type: tls
        bootstrapServers: my-cluster-target-kafka-bootstrap:9092 
    11
    
        config: 
    12
    
          config.storage.replication.factor: 1
          offset.storage.replication.factor: 1
          status.storage.replication.factor: 1
          ssl.cipher.suites: "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384" 
    13
    
          ssl.enabled.protocols: "TLSv1.2"
          ssl.protocol: "TLSv1.2"
          ssl.endpoint.identification.algorithm: HTTPS 
    14
    
        tls: 
    15
    
          trustedCertificates:
          - certificate: ca.crt
            secretName: my-cluster-target-cluster-ca-cert
      mirrors: 
    16
    
      - sourceCluster: "my-cluster-source" 
    17
    
        targetCluster: "my-cluster-target" 
    18
    
        sourceConnector: 
    19
    
          tasksMax: 10 
    20
    
          config:
            replication.factor: 1 
    21
    
            offset-syncs.topic.replication.factor: 1 
    22
    
            sync.topic.acls.enabled: "false" 
    23
    
            refresh.topics.interval.seconds: 60 
    24
    
            replication.policy.separator: "" 
    25
    
            replication.policy.class: "org.apache.kafka.connect.mirror.IdentityReplicationPolicy" 
    26
    
        heartbeatConnector: 
    27
    
          config:
            heartbeats.topic.replication.factor: 1 
    28
    
        checkpointConnector: 
    29
    
          config:
            checkpoints.topic.replication.factor: 1 
    30
    
            refresh.groups.interval.seconds: 600 
    31
    
            sync.group.offsets.enabled: true 
    32
    
            sync.group.offsets.interval.seconds: 60 
    33
    
            emit.checkpoints.interval.seconds: 60 
    34
    
            replication.policy.class: "org.apache.kafka.connect.mirror.IdentityReplicationPolicy"
        topicsPattern: "topic1|topic2|topic3" 
    35
    
        groupsPattern: "group1|group2|group3" 
    36
    
      resources: 
    37
    
        requests:
          cpu: "1"
          memory: 2Gi
        limits:
          cpu: "2"
          memory: 2Gi
      logging: 
    38
    
        type: inline
        loggers:
          connect.root.logger.level: "INFO"
      readinessProbe: 
    39
    
        initialDelaySeconds: 15
        timeoutSeconds: 5
      livenessProbe:
        initialDelaySeconds: 15
        timeoutSeconds: 5
      jvmOptions: 
    40
    
        "-Xmx": "1g"
        "-Xms": "1g"
      image: my-org/my-image:latest 
    41
    
      rack:
        topologyKey: topology.kubernetes.io/zone 
    42
    
      template: 
    43
    
        pod:
          affinity:
            podAntiAffinity:
              requiredDuringSchedulingIgnoredDuringExecution:
                - labelSelector:
                    matchExpressions:
                      - key: application
                        operator: In
                        values:
                          - postgresql
                          - mongodb
                  topologyKey: "kubernetes.io/hostname"
        connectContainer: 
    44
    
          env:
            - name: JAEGER_SERVICE_NAME
              value: my-jaeger-service
            - name: JAEGER_AGENT_HOST
              value: jaeger-agent-name
            - name: JAEGER_AGENT_PORT
              value: "6831"
      tracing:
        type: jaeger 
    45
    
      externalConfiguration: 
    46
    
        env:
          - name: AWS_ACCESS_KEY_ID
            valueFrom:
              secretKeyRef:
                name: aws-creds
                key: awsAccessKey
          - name: AWS_SECRET_ACCESS_KEY
            valueFrom:
              secretKeyRef:
                name: aws-creds
                key: awsSecretAccessKey
    Copy to Clipboard Toggle word wrap
    1
    Kafka Connect 和 Mirror Maker 2.0 版本,其始终相同。
    2
    运行任务的 worker 的副本数。
    3
    Kafka Connect 的 Kafka 集群别名,必须指定 目标 Kafka 集群。Kafka 连接使用 Kafka 集群作为其内部主题。
    4
    正在同步的 Kafka 集群的规格
    5
    源 Kafka 集群的集群别名
    6
    源集群的身份验证使用 TLS 机制,如此处所示,使用 OAuth bearer 令牌或基于 SASL 的 SCRAM-SHA-256/SCRAM-SHA-512PLAIN 机制。
    7
    8
    TLS 加密,使用密钥名称,其中 TLS 证书存储为源 Kafka 集群的 X.509 格式。如果证书存储在同一 secret 中,可以多次列出它。
    9
    目标 Kafka 集群的集群别名
    10
    目标 Kafka 集群的身份验证的配置方式与源 Kafka 集群相同。
    11
    12
    Kafka 连接配置.可以提供标准 Apache Kafka 配置,仅限于不直接由 AMQ Streams 管理的属性。
    13
    使用 TLS 版本的特定 密码套件 运行外部监听程序的 SSL 属性
    14
    通过设置为 HTTPS 来启用主机名验证。空字符串禁用验证。
    15
    目标 Kafka 集群的 TLS 加密配置方式与源 Kafka 集群相同。
    16
    17
    MirrorMaker 2.0 连接器使用的源集群 别名
    18
    MirrorMaker 2.0 连接器使用的目标 集群的集群别名
    19
    创建远程 主题的 MirrorSourceConnector配置配置 会覆盖默认配置选项。
    20
    连接器可能会创建的最大任务数量。任务处理数据复制并并行运行。如果基础架构支持处理开销,则增加这个值可提高吞吐量。Kafka Connect 在集群的成员之间分发任务。如果任务数量超过 worker,则 worker 会被分配多个任务。对于接收器连接器,旨在为每个消耗的主题分区有一个任务。对于源连接器,可以并行运行的任务数量也可能依赖于外部系统。如果无法实现并行,连接器会创建一个数量少于最大任务数量。
    21
    在目标集群中创建的镜像主题的复制因素。
    22
    MirrorSourceConnector offset-syncs 内部主题的复制因素,它映射源和目标集群的偏移量。
    23
    启用 ACL 规则同步后,ACL 将应用到同步主题。默认值是 true。此功能与 User Operator 不兼容。如果使用 User Operator,请将此属性设置为 false
    24
    可选设置,用于更改新主题的检查频率。默认值每 10 分钟进行一次检查。
    25
    定义用于重命名远程主题的分隔符。
    26
    添加可覆盖远程主题自动重命名的策略。该主题不会用源集群的名称来附加名称,而是保留其原始名称。此可选设置可用于主动/被动备份和数据迁移。要配置主题偏移同步,必须为 checkpointConnector.config 设置此属性。
    27
    执行连接检查的 MirrorHeartbeatConnector配置配置 会覆盖默认配置选项。
    28
    在目标集群中创建的 heartbeat 主题的复制因素。
    29
    用于跟踪偏移 MirrorCheckpointConnector配置配置 会覆盖默认配置选项。
    30
    在目标集群中创建的检查点主题的复制因素。
    31
    可选设置,用于更改新消费者组的检查频率。默认值每 10 分钟进行一次检查。
    32
    可选设置同步消费者组偏移,这对于在主动/被动配置中恢复非常有用。默认情况下不启用同步。
    33
    如果启用了消费者组偏移的同步,您可以调整同步的频率。
    34
    调整偏移跟踪的检查频率。如果更改偏移同步的频率,您可能需要调整这些检查的频率。
    35
    来自源集群的主题复制 ,定义为用逗号分开的列表或正则表达式模式。源连接器复制指定的主题。Checkpoint 连接器跟踪指定主题的偏移量。此处我们按名称请求三个主题。
    36
    来自源集群的消费者组复制 ,定义为以逗号分隔的列表或正则表达式模式。checkpoint 连接器复制指定的使用者组。此处我们根据名称请求三个消费者组。
    37
    用于保留 支持的资源、当前 cpu 和内存 的请求,以及指定可消耗的最大资源数量。
    38
    指定的 Kafka Connect 日志记录器和日志级别 直接(内联)或通过 ConfigMap 间接添加(外部)。自定义 ConfigMap 必须放在 log4j.propertieslog4j2.properties 键下。对于 Kafka Connect log4j.rootLogger 日志记录器,您可以将日志级别设置为 INFO、ERROR、WARN、WARN、TRACE、DEBUG、FATAL 或 OFF。
    39
    状况检查,了解何时重启容器(持续)以及容器是否可以接受流量(就绪状态)。
    40
    JVM 配置选项,用于优化运行 Kafka MirrorMaker 的虚拟机(VM)性能。
    41
    ADVANCED OPTION: 容器镜像配置,这只在特殊情况下建议。
    42
    SPECIALIZED OPTION :对部署的意识 配置.这是用于在同一位置(而非跨地区)部署的专用选项。如果您希望连接器从最接近的副本而不是领导副本使用,则使用此选项。在某些情况下,消耗自最接近的副本可以提高网络利用率或降低成本。topologyKey 必须与包含机架 ID 的节点标签匹配。此配置中使用的示例使用标准 topology.kubernetes.io/zone 标签指定区。要从最接近的副本使用,请在 Kafka 代理配置中启用 RackAwareReplicaSelector
    43
    模板自定义。这里调度了带有反关联性的 pod,因此不会将 pod 调度到具有相同主机名的节点。
    44
    还使用 Jaeger 为分布式追踪 设置环境变量。
    45
    46
    OpenShift Secret 的外部配置 作为环境变量挂载到 Kafka MirrorMaker。您还可以使用 配置供应商插件 从外部源 加载配置值
  2. 创建或更新资源:

    oc apply -f MIRRORMAKER-CONFIGURATION-FILE
    Copy to Clipboard Toggle word wrap

2.5.8. 保护 Kafka MirrorMaker 2.0 部署

此流程描述了保护镜像Maker 2.0 部署所需的配置。

您需要对源 Kafka 集群和目标 Kafka 集群进行单独的配置。您还需要单独的用户配置来提供 MirrorMaker 连接到源和目标 Kafka 集群所需的凭证。

对于 Kafka 集群,您可以指定 OpenShift 集群内安全连接的内部监听程序,以及用于 OpenShift 集群外的连接的外部监听程序。

您可以配置身份验证和授权机制。为源和目标 Kafka 集群实施的安全选项必须与为 MirrorMaker 2.0 实施的安全选项兼容。

创建集群和用户身份验证凭据后,您可以在 mirrorMaker 配置中为安全连接指定它们。

注意

在这一流程中,会使用 Cluster Operator 生成的证书,但您可以 通过安装自己的证书 来替换它们。您还可以将监听程序 配置为使用由外部证书颁发机构管理的 Kafka 侦听器证书

开始前

在开始这个过程前,请查看 AMQ Streams 提供的示例配置文件。它们包括使用 TLS 或 SCRAM-SHA-512 身份验证保护镜像的 2.0 部署的示例。示例指定用于在 OpenShift 集群内连接的内部监听程序。

示例还显示 MirrorMaker 2.0 所需的 ACL,以允许在源和目标 Kafka 集群上操作。

先决条件

  • AMQ Streams 处于运行状态
  • 源和目标集群的独立命名空间

此流程假设将源和目标 Kafka 集群安装到单独的命名空间中,如果要使用 Topic Operator,您需要这样做。Topic Operator 只监视指定命名空间中的单个集群。

通过将集群划分为命名空间,您需要复制集群 secret,以便可以在命名空间外访问集群 secret。您需要在 imagesMaker 配置中引用 secret。

流程

  1. 配置两个 Kafka 资源,一个用于保护源 Kafka 集群,另一个来保护目标 Kafka 集群。

    您可以添加用于身份验证的监听程序配置并启用授权。

    在本例中,为使用 TLS 加密和身份验证的 Kafka 集群配置内部监听程序。启用 Kafka 简单 授权。

    使用 TLS 身份验证的源 Kafka 集群配置示例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-source-cluster
    spec:
      kafka:
        version: 3.2.3
        replicas: 1
        listeners:
          - name: tls
            port: 9093
            type: internal
            tls: true
            authentication:
              type: tls
        authorization:
          type: simple
        config:
          offsets.topic.replication.factor: 1
          transaction.state.log.replication.factor: 1
          transaction.state.log.min.isr: 1
          default.replication.factor: 1
          min.insync.replicas: 1
          inter.broker.protocol.version: "3.2"
        storage:
          type: jbod
          volumes:
          - id: 0
            type: persistent-claim
            size: 100Gi
            deleteClaim: false
      zookeeper:
        replicas: 1
        storage:
          type: persistent-claim
          size: 100Gi
          deleteClaim: false
      entityOperator:
        topicOperator: {}
        userOperator: {}
    Copy to Clipboard Toggle word wrap

    使用 TLS 身份验证的目标 Kafka 集群配置示例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-target-cluster
    spec:
      kafka:
        version: 3.2.3
        replicas: 1
        listeners:
          - name: tls
            port: 9093
            type: internal
            tls: true
            authentication:
              type: tls
        authorization:
          type: simple
        config:
          offsets.topic.replication.factor: 1
          transaction.state.log.replication.factor: 1
          transaction.state.log.min.isr: 1
          default.replication.factor: 1
          min.insync.replicas: 1
          inter.broker.protocol.version: "3.2"
        storage:
          type: jbod
          volumes:
            - id: 0
              type: persistent-claim
              size: 100Gi
              deleteClaim: false
      zookeeper:
        replicas: 1
        storage:
          type: persistent-claim
          size: 100Gi
          deleteClaim: false
      entityOperator:
        topicOperator: {}
        userOperator: {}
    Copy to Clipboard Toggle word wrap

  2. 在单独的命名空间中创建或更新 Kafka 资源。

    oc apply -f <kafka_configuration_file> -n <namespace>
    Copy to Clipboard Toggle word wrap

    Cluster Operator 创建监听程序并设置集群和客户端证书颁发机构(CA)证书,以便在 Kafka 集群中启用身份验证。

    证书在 secret < cluster_name> -cluster-ca-cert 中创建。

  3. 配置两个 KafkaUser 资源,一个用于源 Kafka 集群的用户,一个用于目标 Kafka 集群的用户。

    1. 配置与对应的源和目标 Kafka 集群相同的身份验证和授权类型。例如,如果您在源 Kafka 集群的 Kafka 配置中使用了 tls 验证和 simple 授权类型,请在 KafkaUser 配置中使用相同的。
    2. 配置 MirrorMaker 2.0 所需的 ACL,以允许在源和目标 Kafka 集群上执行操作。

      ACL 由内部 MirrorMaker 连接器以及底层 Kafka 连接框架使用。

    TLS 客户端身份验证的源用户配置示例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaUser
    metadata:
      name: my-source-user
      labels:
        strimzi.io/cluster: my-source-cluster
    spec:
      authentication:
        type: tls
      authorization:
        type: simple
        acls:
          # MirrorSourceConnector
          - resource: # Not needed if offset-syncs.topic.location=target
              type: topic
              name: mm2-offset-syncs.my-target-cluster.internal
            operation: Create
          - resource: # Not needed if offset-syncs.topic.location=target
              type: topic
              name: mm2-offset-syncs.my-target-cluster.internal
            operation: DescribeConfigs
          - resource: # Not needed if offset-syncs.topic.location=target
              type: topic
              name: mm2-offset-syncs.my-target-cluster.internal
            operation: Write
          - resource: # Needed for every topic which is mirrored
              type: topic
              name: "*"
            operation: Read
          - resource: # Needed for every topic which is mirrored
              type: topic
              name: "*"
            operation: DescribeConfigs
          # MirrorCheckpointConnector
          - resource:
              type: cluster
            operation: Describe
          - resource: # Needed for every group for which offsets are synced
              type: group
              name: "*"
            operation: Describe
          - resource: # Not needed if offset-syncs.topic.location=target
              type: topic
              name: mm2-offset-syncs.my-target-cluster.internal
            operation: Read
    Copy to Clipboard Toggle word wrap

    TLS 客户端身份验证的目标用户配置示例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaUser
    metadata:
      name: my-target-user
      labels:
        strimzi.io/cluster: my-target-cluster
    spec:
      authentication:
        type: tls
      authorization:
        type: simple
        acls:
          # Underlying Kafka Connect internal topics to store configuration, offsets, or status
          - resource:
              type: group
              name: mirrormaker2-cluster
            operation: Read
          - resource:
              type: topic
              name: mirrormaker2-cluster-configs
            operation: Read
          - resource:
              type: topic
              name: mirrormaker2-cluster-configs
            operation: Describe
          - resource:
              type: topic
              name: mirrormaker2-cluster-configs
            operation: DescribeConfigs
          - resource:
              type: topic
              name: mirrormaker2-cluster-configs
            operation: Write
          - resource:
              type: topic
              name: mirrormaker2-cluster-configs
            operation: Create
          - resource:
              type: topic
              name: mirrormaker2-cluster-status
            operation: Read
          - resource:
              type: topic
              name: mirrormaker2-cluster-status
            operation: Describe
          - resource:
              type: topic
              name: mirrormaker2-cluster-status
            operation: DescribeConfigs
          - resource:
              type: topic
              name: mirrormaker2-cluster-status
            operation: Write
          - resource:
              type: topic
              name: mirrormaker2-cluster-status
            operation: Create
          - resource:
              type: topic
              name: mirrormaker2-cluster-offsets
            operation: Read
          - resource:
              type: topic
              name: mirrormaker2-cluster-offsets
            operation: Write
          - resource:
              type: topic
              name: mirrormaker2-cluster-offsets
            operation: Describe
          - resource:
              type: topic
              name: mirrormaker2-cluster-offsets
            operation: DescribeConfigs
          - resource:
              type: topic
              name: mirrormaker2-cluster-offsets
            operation: Create
          # MirrorSourceConnector
          - resource: # Needed for every topic which is mirrored
              type: topic
              name: "*"
            operation: Create
          - resource: # Needed for every topic which is mirrored
              type: topic
              name: "*"
            operation: Alter
          - resource: # Needed for every topic which is mirrored
              type: topic
              name: "*"
            operation: AlterConfigs
          - resource: # Needed for every topic which is mirrored
              type: topic
              name: "*"
            operation: Write
          # MirrorCheckpointConnector
          - resource:
              type: cluster
            operation: Describe
          - resource:
              type: topic
              name: my-source-cluster.checkpoints.internal
            operation: Create
          - resource:
              type: topic
              name: my-source-cluster.checkpoints.internal
            operation: Describe
          - resource:
              type: topic
              name: my-source-cluster.checkpoints.internal
            operation: Write
          - resource: # Needed for every group for which the offset is synced
              type: group
              name: "*"
            operation: Read
          - resource: # Needed for every group for which the offset is synced
              type: group
              name: "*"
            operation: Describe
          - resource: # Needed for every topic which is mirrored
              type: topic
              name: "*"
            operation: Read
          # MirrorHeartbeatConnector
          - resource:
              type: topic
              name: heartbeats
            operation: Create
          - resource:
              type: topic
              name: heartbeats
            operation: Describe
          - resource:
              type: topic
              name: heartbeats
            operation: Write
    Copy to Clipboard Toggle word wrap

    注意

    您可以通过将 type 设置为 tls-external 来使用 User Operator 外部发布的证书。如需更多信息,请参阅 用户身份验证

  4. 在您为源和目标集群创建的每个命名空间中创建或更新 KafkaUser 资源。

    oc apply -f <kafka_user_configuration_file> -n <namespace>
    Copy to Clipboard Toggle word wrap

    User Operator 根据所选的身份验证类型创建代表客户端(MirrorMaker)和用于客户端身份验证的安全凭证的用户。

    User Operator 会创建一个与 KafkaUser 资源名称相同的新 secret。secret 包含用于 TLS 客户端身份验证的私钥和公钥。公钥包含在用户证书中,由客户端证书颁发机构(CA)签名。

  5. 使用身份验证详情配置 KafkaMirrorMaker2 资源,以连接到源和目标集群。

    带有 TLS 身份验证的 MirrorMaker 2.0 配置示例

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaMirrorMaker2
    metadata:
      name: my-mirror-maker-2
    spec:
      version: 3.2.3
      replicas: 1
      connectCluster: "my-target-cluster"
      clusters:
        - alias: "my-source-cluster"
          bootstrapServers: my-source-cluster-kafka-bootstrap:9093
          tls: 
    1
    
            trustedCertificates:
              - secretName: my-source-cluster-cluster-ca-cert
                certificate: ca.crt
          authentication: 
    2
    
            type: tls
            certificateAndKey:
              secretName: my-source-user
              certificate: user.crt
              key: user.key
        - alias: "my-target-cluster"
          bootstrapServers: my-target-cluster-kafka-bootstrap:9093
          tls: 
    3
    
            trustedCertificates:
              - secretName: my-target-cluster-cluster-ca-cert
                certificate: ca.crt
          authentication: 
    4
    
            type: tls
            certificateAndKey:
              secretName: my-target-user
              certificate: user.crt
              key: user.key
          config:
            # -1 means it will use the default replication factor configured in the broker
            config.storage.replication.factor: -1
            offset.storage.replication.factor: -1
            status.storage.replication.factor: -1
      mirrors:
        - sourceCluster: "my-source-cluster"
          targetCluster: "my-target-cluster"
          sourceConnector:
            config:
              replication.factor: 1
              offset-syncs.topic.replication.factor: 1
              sync.topic.acls.enabled: "false"
          heartbeatConnector:
            config:
              heartbeats.topic.replication.factor: 1
          checkpointConnector:
            config:
              checkpoints.topic.replication.factor: 1
              sync.group.offsets.enabled: "true"
          topicsPattern: "topic1|topic2|topic3"
          groupsPattern: "group1|group2|group3"
    Copy to Clipboard Toggle word wrap

    1
    源 Kafka 集群的 TLS 证书。如果它们位于单独的命名空间中,请从 Kafka 集群的命名空间复制集群 secret。
    2
    使用 TLS 机制 访问源 Kafka 集群的用户身份验证。
    3
    目标 Kafka 集群的 TLS 证书。
    4
    访问目标 Kafka 集群的用户身份验证。
  6. 在与目标 Kafka 集群相同的命名空间中创建或更新 KafkaMirrorMaker2 资源。

    oc apply -f <mirrormaker2_configuration_file> -n <namespace_of_target_cluster>
    Copy to Clipboard Toggle word wrap

2.5.9. 执行 Kafka MirrorMaker 2.0 连接器重启

此流程描述了如何使用 OpenShift 注解手动触发 Kafka MirrorMaker 2.0 连接器的重启。

先决条件

  • Cluster Operator 正在运行。

流程

  1. 查找控制要重启的 Kafka MirrorMaker 2.0 连接器的 KafkaMirrorMaker2 自定义资源的名称:

    oc get KafkaMirrorMaker2
    Copy to Clipboard Toggle word wrap
  2. 查找 Kafka MirrorMaker 2.0 连接器的名称,以便从 KafkaMirrorMaker2 自定义资源重启。

    oc describe KafkaMirrorMaker2 KAFKAMIRRORMAKER-2-NAME
    Copy to Clipboard Toggle word wrap
  3. 要重启连接器,请在 OpenShift 中注解 KafkaMirrorMaker2 资源。在本例中,oc annotate 重启名为 my-source->my-target.MirrorSourceConnector 的连接器

    oc annotate KafkaMirrorMaker2 KAFKAMIRRORMAKER-2-NAME "strimzi.io/restart-connector=my-source->my-target.MirrorSourceConnector"
    Copy to Clipboard Toggle word wrap
  4. 等待下一次协调发生(默认为两分钟)。

    Kafka MirrorMaker 2.0 连接器将重启,只要该注解由协调过程检测到。当接受重启请求时,注解会从 KafkaMirrorMaker2 自定义资源中删除。

2.5.10. 执行 Kafka MirrorMaker 2.0 连接器任务重启

此流程描述了如何使用 OpenShift 注解手动触发 Kafka MirrorMaker 2.0 连接器任务的重启。

先决条件

  • Cluster Operator 正在运行。

流程

  1. 查找控制要重启的 Kafka MirrorMaker 2.0 连接器的 KafkaMirrorMaker2 自定义资源的名称:

    oc get KafkaMirrorMaker2
    Copy to Clipboard Toggle word wrap
  2. 查找 Kafka MirrorMaker 2.0 连接器的名称以及要从 KafkaMirrorMaker2 自定义资源重启的任务 ID。任务 ID 是非负的整数,从 0 开始。

    oc describe KafkaMirrorMaker2 KAFKAMIRRORMAKER-2-NAME
    Copy to Clipboard Toggle word wrap
  3. 要重启连接器任务,请在 OpenShift 中注解 KafkaMirrorMaker2 资源。在本例中,oc annotate 重启名为 my-source->my-target.MirrorSourceConnector 的连接器的任务 0

    oc annotate KafkaMirrorMaker2 KAFKAMIRRORMAKER-2-NAME "strimzi.io/restart-connector-task=my-source->my-target.MirrorSourceConnector:0"
    Copy to Clipboard Toggle word wrap
  4. 等待下一次协调发生(默认为两分钟)。

    Kafka MirrorMaker 2.0 连接器任务会重启,只要该注解由协调过程检测到。当重启任务请求被接受时,注解会从 KafkaMirrorMaker2 自定义资源中删除。

返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat