2.4.4. 使用 MirrorMaker 2.0 在 Kafka 集群间同步数据
使用 MirrorMaker 2.0 通过配置同步 Kafka 集群之间的数据。
配置必须指定:
- 每个 Kafka 集群
- 每个集群的连接信息,包括 TLS 身份验证
复制流和方向
- 集群到集群
- 主题
使用 KafkaMirrorMaker2
资源的属性来配置 Kafka MirrorMaker 2.0 部署。
注意
以前版本的 MirrorMaker 继续受到支持。如果要使用为之前版本配置的资源,则必须将其更新为 MirrorMaker 2.0 所支持的格式。
MirrorMaker 2.0 为复制因素等属性提供默认配置值。最小配置(默认值保持不变)会类似如下:
apiVersion: kafka.strimzi.io/v1alpha1 kind: KafkaMirrorMaker2 metadata: name: my-mirror-maker2 spec: version: 2.6.0 connectCluster: "my-cluster-target" clusters: - alias: "my-cluster-source" bootstrapServers: my-cluster-source-kafka-bootstrap:9092 - alias: "my-cluster-target" bootstrapServers: my-cluster-target-kafka-bootstrap:9092 mirrors: - sourceCluster: "my-cluster-source" targetCluster: "my-cluster-target" sourceConnector: {}
您可以使用 TLS 或 SASL 身份验证为源和目标集群配置访问控制。此流程显示为源和目标集群使用 TLS 加密和身份验证的配置。
先决条件
有关运行 以下的说明,请参阅 OpenShift 指南中的部署和升级 AMQ Streams :
- 源和目标 Kafka 集群必须可用
步骤
编辑
KafkaMirrorMaker2
资源的spec
属性。您可以配置的属性显示在此示例配置中:
apiVersion: kafka.strimzi.io/v1alpha1 kind: KafkaMirrorMaker2 metadata: name: my-mirror-maker2 spec: version: 2.6.0 1 replicas: 3 2 connectCluster: "my-cluster-target" 3 clusters: 4 - alias: "my-cluster-source" 5 authentication: 6 certificateAndKey: certificate: source.crt key: source.key secretName: my-user-source type: tls bootstrapServers: my-cluster-source-kafka-bootstrap:9092 7 tls: 8 trustedCertificates: - certificate: ca.crt secretName: my-cluster-source-cluster-ca-cert - alias: "my-cluster-target" 9 authentication: 10 certificateAndKey: certificate: target.crt key: target.key secretName: my-user-target type: tls bootstrapServers: my-cluster-target-kafka-bootstrap:9092 11 config: 12 config.storage.replication.factor: 1 offset.storage.replication.factor: 1 status.storage.replication.factor: 1 ssl.cipher.suites: "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384" 13 ssl.enabled.protocols: "TLSv1.2" ssl.protocol: "TLSv1.2" ssl.endpoint.identification.algorithm: HTTPS 14 tls: 15 trustedCertificates: - certificate: ca.crt secretName: my-cluster-target-cluster-ca-cert mirrors: 16 - sourceCluster: "my-cluster-source" 17 targetCluster: "my-cluster-target" 18 sourceConnector: 19 config: replication.factor: 1 20 offset-syncs.topic.replication.factor: 1 21 sync.topic.acls.enabled: "false" 22 replication.policy.separator: "" 23 replication.policy.class: "io.strimzi.kafka.connect.mirror.IdentityReplicationPolicy" 24 heartbeatConnector: 25 config: heartbeats.topic.replication.factor: 1 26 checkpointConnector: 27 config: checkpoints.topic.replication.factor: 1 28 topicsPattern: ".*" 29 groupsPattern: "group1|group2|group3" 30 resources: 31 requests: cpu: "1" memory: 2Gi limits: cpu: "2" memory: 2Gi logging: 32 type: inline loggers: connect.root.logger.level: "INFO" readinessProbe: 33 initialDelaySeconds: 15 timeoutSeconds: 5 livenessProbe: initialDelaySeconds: 15 timeoutSeconds: 5 jvmOptions: 34 "-Xmx": "1g" "-Xms": "1g" image: my-org/my-image:latest 35 template: 36 pod: affinity: podAntiAffinity: requiredDuringSchedulingIgnoredDuringExecution: - labelSelector: matchExpressions: - key: application operator: In values: - postgresql - mongodb topologyKey: "kubernetes.io/hostname" connectContainer: 37 env: - name: JAEGER_SERVICE_NAME value: my-jaeger-service - name: JAEGER_AGENT_HOST value: jaeger-agent-name - name: JAEGER_AGENT_PORT value: "6831" tracing: type: jaeger 38 externalConfiguration: 39 env: - name: AWS_ACCESS_KEY_ID valueFrom: secretKeyRef: name: aws-creds key: awsAccessKey - name: AWS_SECRET_ACCESS_KEY valueFrom: secretKeyRef: name: aws-creds key: awsSecretAccessKey
- 1
- Kafka Connect 版本。
- 2
- 3
- Kafka Connect 的集群 别名.
- 4
- 正在同步 的 Kafka 集群的规格。
- 5
- 源 Kafka 集群的集群 别名。
- 6
- 7
- 8
- 使用源 Kafka 集群的 TLS 证书以 X.509 格式存储的密钥名称进行 TLS 加密。如果证书存储在同一个 secret 中,则可以多次列出证书。
- 9
- 目标 Kafka 集群的集群 别名。
- 10
- 目标 Kafka 集群的身份验证的配置方式与源 Kafka 集群相同。
- 11
- 12
- Kafka Connect 配置.标准 Apache Kafka 配置可能会提供,仅限于不是由 AMQ Streams 直接管理的属性。
- 13
- 使用特定 密码套件 为 TLS 版本运行外部侦听器的 SSL 属性。
- 14
- 通过将 设置为
HTTPS
来 启用主机名验证。空字符串将禁用验证。 - 15
- 目标 Kafka 集群的 TLS 加密配置方式与源 Kafka 集群相同。
- 16
- 17
- MirrorMaker 2.0 连接器使用的源集群的集群 别名。
- 18
- MirrorMaker 2.0 连接器使用的目标集群的集群 别名。
- 19
- 创建远程主题的
MirrorSourceConnector
配置。配置
会覆盖默认配置选项。 - 20
- 在目标集群中创建的已镜像主题的复制因素。
- 21
MirrorSourceConnector
offset-syncs
内部主题的复制因素,用于映射源和目标集群的偏移。- 22
- 启用 ACL 规则同步 时,将应用 ACL 来同步主题。默认值是
true
。 - 23
- 定义用于重命名远程主题的分隔符。
- 24
- 添加可覆盖远程主题自动重命名的策略。该主题不会用源集群的名称来附加名称,而是保留其原始名称。此可选设置对主动/被动备份和数据迁移很有用。
- 25
- 执行连接检查
的 MirrorHeartbeatConnector
配置。配置
会覆盖默认配置选项。 - 26
- 在目标集群中创建的心跳主题的复制因素。
- 27
- 跟踪偏移的
MirrorCheckpointConnector
配置。配置
会覆盖默认配置选项。 - 28
- 在目标集群中创建的检查点主题的复制因素。
- 29
- 从 定义为正则表达式模式的 源集群进行主题复制。这里我们请求所有主题。
- 30
- 来自 定义为正则表达式模式的 源集群的使用者组复制。在这里,我们按名称请求三个消费者组。您可以使用逗号分隔的列表。
- 31
- 32
- 指定 Kafka 连接日志记录器和日志级别 直接(
内联)或通过
ConfigMap 间接(外部
)添加。自定义 ConfigMap 必须放在log4j.properties 或
log4j2.properties
键下。对于 Kafka Connectlog4j.rootLogger
日志记录器,您可以将日志级别设置为 INFO、ERROR、WARN、TRACE、DEBUG、FATAL 或 OFF。 - 33
- 健康检查以了解 何时重新启动容器(存活度)以及容器何时可以接受流量(就绪度)。
- 34
- 用于优化运行 Kafka MirrorMaker 的虚拟机(VM)性能的 JVM 配置选项。
- 35
- 高级选项:容器镜像配置,建议仅在特殊情况下使用。
- 36
- 模板自定义.在这里,pod 被调度为反关联性,因此 pod 不会调度到具有相同主机名的节点。
- 37
- 还 使用 Jaeger 为分布式追踪设置 环境变量。
- 38
- 39
- 作为环境变量挂载到 Kafka MirrorMaker 的 OpenShift Secret 的外部配置。
创建或更新资源:
oc apply -f <your-file>