8.7. 在专用模式下运行 MirrorMaker 2.0
使用 MirrorMaker 2.0 通过配置在 Kafka 集群间同步数据。此流程演示了如何配置和运行专用的单节点 mirrorMaker 2.0 集群。专用集群使用 Kafka Connect worker 节点来镜像 Kafka 集群之间的数据。目前,MirrorMaker 2.0 在专用模式下只能用于单个 worker 节点。
也可以在分布式模式下运行 MirrorMaker 2.0。在分布式模式中,mirrorMaker 2.0 作为 Kafka Connect 集群中的连接器运行。Kafka 为数据复制提供 MirrorMaker 源连接器。如果要使用连接器而不是运行专用 mirrorMaker 集群,必须在 Kafka Connect 集群中配置连接器。如需更多信息,请参阅 Apache Kafka 文档。
在传统模式中运行 MirrorMaker 2.0,之前版本的 MirrorMaker 将继续被支持。
配置必须指定:
- 每个 Kafka 集群
- 每个集群的连接信息,包括 TLS 身份验证
复制流和方向
- 集群到集群
- 主题
- 复制规则
- 处理的偏移跟踪间隔
此流程描述了如何通过在属性文件中创建配置来实现 MirrorMaker 2.0,然后在使用 MirrorMaker 脚本文件设置连接时传递属性。
您可以指定您想要从源集群复制的主题和消费者组。您可以指定源和目标集群的名称,然后指定要复制的主题和消费者组。
在以下示例中,指定了主题和消费者组,用于将从集群 1 复制到 2。
复制特定主题和消费者组的示例
clusters=cluster-1,cluster-2
cluster-1->cluster-2.topics = topic-1, topic-2
cluster-1->cluster-2.groups = group-1, group-2
您可以提供名称或使用正则表达式的列表。默认情况下,如果没有设置这些属性,则会复制所有主题和消费者组。您还可以使用 .* 作为正则表达式来复制所有主题和消费者组。但是,尝试只指定您需要指定主题和消费者组,以避免在集群中造成不必要的额外负载。
开始前
在 ./config/connect-mirror-maker.properties 中提供了配置示例配置属性文件。
先决条件
- 您需要在要复制的每个 Kafka 集群节点的主机上安装 AMQ Streams。
流程
在文本编辑器中打开示例属性文件,或创建一个新文件,并编辑该文件使其包含连接信息以及每个 Kafka 集群的复制流程。
以下示例显示了连接两个集群( cluster-1 和 cluster-2,双向)的配置。集群名称可通过集群属性
进行配置。MirrorMaker 2.0 配置示例
clusters=cluster-1,cluster-21 cluster-1.bootstrap.servers=<cluster_name>-kafka-bootstrap-<project_name_one>:4432 cluster-1.security.protocol=SSL3 cluster-1.ssl.truststore.password=<truststore_name> cluster-1.ssl.truststore.location=<path_to_truststore>/truststore.cluster-1.jks_ cluster-1.ssl.keystore.password=<keystore_name> cluster-1.ssl.keystore.location=<path_to_keystore>/user.cluster-1.p12_ cluster-2.bootstrap.servers=<cluster_name>-kafka-bootstrap-<project_name_two>:4434 cluster-2.security.protocol=SSL5 cluster-2.ssl.truststore.password=<truststore_name> cluster-2.ssl.truststore.location=<path_to_truststore>/truststore.cluster-2.jks_ cluster-2.ssl.keystore.password=<keystore_name> cluster-2.ssl.keystore.location=<path_to_keystore>/user.cluster-2.p12_ cluster-1->cluster-2.enabled=true6 cluster-2->cluster-1.enabled=true7 cluster-1->cluster-2.topics=.*8 cluster-2->cluster-1.topics=topic-1, topic-29 cluster-1->cluster-2.groups=.*10 cluster-2->cluster-1.groups=group-1, group-211 replication.policy.separator=-12 sync.topic.acls.enabled=false13 refresh.topics.interval.seconds=6014 refresh.groups.interval.seconds=6015 - 1
- 每个 Kafka 集群都使用其别名来标识。
- 2
- cluster-1 的连接信息,使用 bootstrap 地址和端口 443。两个集群都使用端口 443 来使用 OpenShift 路由 连接到 Kafka。
- 3
ssl.属性定义 cluster-1 的 TLS 配置。- 4
- cluster-2 的连接信息。
- 5
ssl.属性定义 cluster-2 的 TLS 配置。- 6
- 从 cluster-1 到 cluster-2 的复制流。
- 7
- 从 cluster-2 到 cluster-1 启用了复制流。
- 8
- 将所有主题从 cluster-1 复制到 cluster-2。源连接器复制指定的主题。Checkpoint 连接器跟踪指定主题的偏移量。
- 9
- 将特定主题从 cluster-2 复制到 cluster-1。
- 10
- 将所有消费者组从 cluster-1 复制到 cluster-2。checkpoint 连接器复制指定的使用者组。
- 11
- 将特定消费者组从 cluster-2 复制到 cluster-1。
- 12
- 定义用于重命名远程主题的分隔符。
- 13
- 启用后,ACL 会被应用到同步的主题。默认值为
false。 - 14
- 用于检查要同步的新主题之间的周期。
- 15
- 要同步的新消费者组之间的周期。
选项:如果需要,请添加覆盖远程主题自动重命名的策略。该主题不会用源集群的名称来附加名称,而是保留其原始名称。
此可选设置用于主动/被动备份和数据迁移。
replication.policy.class=org.apache.kafka.connect.mirror.IdentityReplicationPolicy选项:如果要同步消费者组偏移,请添加配置来启用和管理同步:
refresh.groups.interval.seconds=60 sync.group.offsets.enabled=true1 sync.group.offsets.interval.seconds=602 emit.checkpoints.interval.seconds=603 在目标集群中启动 ZooKeeper 和 Kafka:
su - kafka /opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties使用您在属性文件中定义的集群连接配置和复制策略启动 MirrorMaker:
/opt/kafka/bin/connect-mirror-maker.sh /config/connect-mirror-maker.propertiesMirrorMaker 在集群之间设置连接。
对于每个目标集群,验证是否复制主题:
/opt/kafka/bin/kafka-topics.sh --bootstrap-server <broker_address> --list