2.4.4. 使用 MirrorMaker 2.0 在 Kafka 集群间同步数据
使用 MirrorMaker 2.0 通过配置同步 Kafka 集群之间的数据。
配置必须指定:
- 每个 Kafka 集群
- 每个集群的连接信息,包括 TLS 身份验证
复制流和方向
- 集群到集群
- 主题
使用 KafkaMirrorMaker2
资源的属性来配置 Kafka MirrorMaker 2.0 部署。
注意
以前版本的 MirrorMaker 继续受到支持。如果要使用为之前版本配置的资源,则必须将其更新为 MirrorMaker 2.0 所支持的格式。
MirrorMaker 2.0 为复制因素等属性提供默认配置值。最小配置(默认值保持不变)会类似如下:
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaMirrorMaker2 metadata: name: my-mirror-maker2 spec: version: 2.8.0 connectCluster: "my-cluster-target" clusters: - alias: "my-cluster-source" bootstrapServers: my-cluster-source-kafka-bootstrap:9092 - alias: "my-cluster-target" bootstrapServers: my-cluster-target-kafka-bootstrap:9092 mirrors: - sourceCluster: "my-cluster-source" targetCluster: "my-cluster-target" sourceConnector: {}
您可以使用 TLS 或 SASL 身份验证为源和目标集群配置访问控制。此流程显示为源和目标集群使用 TLS 加密和身份验证的配置。
先决条件
有关运行 以下的说明,请参阅 OpenShift 指南中的部署和升级 AMQ Streams :
- 源和目标 Kafka 集群必须可用
流程
编辑
KafkaMirrorMaker2
资源的spec
属性。您可以配置的属性显示在此示例配置中:
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaMirrorMaker2 metadata: name: my-mirror-maker2 spec: version: 2.8.0 1 replicas: 3 2 connectCluster: "my-cluster-target" 3 clusters: 4 - alias: "my-cluster-source" 5 authentication: 6 certificateAndKey: certificate: source.crt key: source.key secretName: my-user-source type: tls bootstrapServers: my-cluster-source-kafka-bootstrap:9092 7 tls: 8 trustedCertificates: - certificate: ca.crt secretName: my-cluster-source-cluster-ca-cert - alias: "my-cluster-target" 9 authentication: 10 certificateAndKey: certificate: target.crt key: target.key secretName: my-user-target type: tls bootstrapServers: my-cluster-target-kafka-bootstrap:9092 11 config: 12 config.storage.replication.factor: 1 offset.storage.replication.factor: 1 status.storage.replication.factor: 1 ssl.cipher.suites: "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384" 13 ssl.enabled.protocols: "TLSv1.2" ssl.protocol: "TLSv1.2" ssl.endpoint.identification.algorithm: HTTPS 14 tls: 15 trustedCertificates: - certificate: ca.crt secretName: my-cluster-target-cluster-ca-cert mirrors: 16 - sourceCluster: "my-cluster-source" 17 targetCluster: "my-cluster-target" 18 sourceConnector: 19 tasksMax: 10 20 config: replication.factor: 1 21 offset-syncs.topic.replication.factor: 1 22 sync.topic.acls.enabled: "false" 23 refresh.topics.interval.seconds: 60 24 replication.policy.separator: "" 25 replication.policy.class: "io.strimzi.kafka.connect.mirror.IdentityReplicationPolicy" 26 heartbeatConnector: 27 config: heartbeats.topic.replication.factor: 1 28 checkpointConnector: 29 config: checkpoints.topic.replication.factor: 1 30 refresh.groups.interval.seconds: 600 31 sync.group.offsets.enabled: true 32 sync.group.offsets.interval.seconds: 60 33 emit.checkpoints.interval.seconds: 60 34 replication.policy.class: "io.strimzi.kafka.connect.mirror.IdentityReplicationPolicy" topicsPattern: ".*" 35 groupsPattern: "group1|group2|group3" 36 resources: 37 requests: cpu: "1" memory: 2Gi limits: cpu: "2" memory: 2Gi logging: 38 type: inline loggers: connect.root.logger.level: "INFO" readinessProbe: 39 initialDelaySeconds: 15 timeoutSeconds: 5 livenessProbe: initialDelaySeconds: 15 timeoutSeconds: 5 jvmOptions: 40 "-Xmx": "1g" "-Xms": "1g" image: my-org/my-image:latest 41 template: 42 pod: affinity: podAntiAffinity: requiredDuringSchedulingIgnoredDuringExecution: - labelSelector: matchExpressions: - key: application operator: In values: - postgresql - mongodb topologyKey: "kubernetes.io/hostname" connectContainer: 43 env: - name: JAEGER_SERVICE_NAME value: my-jaeger-service - name: JAEGER_AGENT_HOST value: jaeger-agent-name - name: JAEGER_AGENT_PORT value: "6831" tracing: type: jaeger 44 externalConfiguration: 45 env: - name: AWS_ACCESS_KEY_ID valueFrom: secretKeyRef: name: aws-creds key: awsAccessKey - name: AWS_SECRET_ACCESS_KEY valueFrom: secretKeyRef: name: aws-creds key: awsSecretAccessKey
- 1
- Kafka Connect 和 Mirror Maker 2.0 版本,版本 将始终相同。
- 2
- 3
- Kafka Connect 的 Kafka 集群别名,它必须指定 目标 Kafka 集群。Kafka 集群由 Kafka Connect 用于其内部主题。
- 4
- 正在同步 的 Kafka 集群的规格。
- 5
- 源 Kafka 集群的集群 别名。
- 6
- 7
- 8
- 使用源 Kafka 集群的 TLS 证书以 X.509 格式存储的密钥名称进行 TLS 加密。如果证书存储在同一个 secret 中,则可以多次列出证书。
- 9
- 目标 Kafka 集群的集群 别名。
- 10
- 目标 Kafka 集群的身份验证的配置方式与源 Kafka 集群相同。
- 11
- 12
- Kafka Connect 配置.标准 Apache Kafka 配置可能会提供,仅限于不是由 AMQ Streams 直接管理的属性。
- 13
- 使用特定 密码套件 为 TLS 版本运行外部侦听器的 SSL 属性。
- 14
- 通过将 设置为
HTTPS
来 启用主机名验证。空字符串将禁用验证。 - 15
- 目标 Kafka 集群的 TLS 加密配置方式与源 Kafka 集群相同。
- 16
- 17
- MirrorMaker 2.0 连接器使用的源集群的集群 别名。
- 18
- MirrorMaker 2.0 连接器使用的目标集群的集群 别名。
- 19
- 创建远程主题的
MirrorSourceConnector
配置。配置
会覆盖默认配置选项。 - 20
- 连接器可以创建的任务数量上限。任务处理数据复制,并并行运行。如果基础架构支持处理开销,增加这个值可以提高吞吐量。Kafka Connect 在集群成员之间分发任务。如果任务数量超过工作程序,则为工作程序分配多项任务。对于接收器连接器,旨在为每个主题分区使用一个任务。对于源连接器,可以并行运行的任务数量也可能取决于外部系统。如果无法实现并行性,连接器会创建少于任务的最大数量。
- 21
- 在目标集群中创建的已镜像主题的复制因素。
- 22
MirrorSourceConnector
offset-syncs
内部主题的复制因素,用于映射源和目标集群的偏移。- 23
- 启用 ACL 规则同步 时,将应用 ACL 来同步主题。默认值为
true
。 - 24
- 可选设置,可更改新主题的检查频率。默认为每 10 分钟检查一次。
- 25
- 定义用于重命名远程主题的分隔符。
- 26
- 添加可覆盖远程主题自动重命名的策略。该主题不会用源集群的名称来附加名称,而是保留其原始名称。此可选设置对主动/被动备份和数据迁移很有用。要配置主题偏移同步,还必须为
checkpointConnector.config
设置此属性。 - 27
- 执行连接检查
的 MirrorHeartbeatConnector
配置。配置
会覆盖默认配置选项。 - 28
- 在目标集群中创建的心跳主题的复制因素。
- 29
- 跟踪偏移的
MirrorCheckpointConnector
配置。配置
会覆盖默认配置选项。 - 30
- 在目标集群中创建的检查点主题的复制因素。
- 31
- 可选设置,以更改新使用者组的检查频率。默认为每 10 分钟检查一次。
- 32
- 可选设置来同步消费者组偏移,这对于主动/被动配置中的恢复非常有用。默认情况下不启用同步。
- 33
- 如果启用了消费者组偏移的同步,您可以调整同步的频率。
- 34
- 调整检查偏移跟踪的频率。如果更改了偏移同步的频率,您可能还需要调整这些检查的频率。
- 35
- 从 定义为正则表达式模式的 源集群进行主题复制。这里我们请求所有主题。
- 36
- 来自 定义为正则表达式模式的 源集群的使用者组复制。在这里,我们按名称请求三个消费者组。您可以使用逗号分隔的列表。
- 37
- 38
- 指定 Kafka 连接日志记录器和日志级别 直接(
内联)或通过
ConfigMap 间接(外部
)添加。自定义 ConfigMap 必须放在log4j.properties 或
log4j2.properties
键下。对于 Kafka Connectlog4j.rootLogger
日志记录器,您可以将日志级别设置为 INFO、ERROR、WARN、TRACE、DEBUG、FATAL 或 OFF。 - 39
- 健康检查以了解 何时重新启动容器(存活度)以及容器何时可以接受流量(就绪度)。
- 40
- 用于优化运行 Kafka MirrorMaker 的虚拟机(VM)性能的 JVM 配置选项。
- 41
- ADVANCED OPTION: 容器镜像配置,只在特殊情况下推荐这样做。
- 42
- 模板自定义.在这里,pod 被调度为反关联性,因此 pod 不会调度到具有相同主机名的节点。
- 43
- 还 使用 Jaeger 为分布式追踪设置 环境变量。
- 44
- 45
创建或更新资源:
oc apply -f MIRRORMAKER-CONFIGURATION-FILE