8.6. 在专用模式下运行 MirrorMaker 2.0
使用 MirrorMaker 2.0 通过配置同步 Kafka 集群之间的数据。此流程演示了如何配置和运行专用单节点 MirrorMaker 2.0 集群。专用集群使用 Kafka 连接 worker 节点来镜像 Kafka 集群之间的数据。目前,MirrorMaker 2.0 在专用模式下只能用于单个 worker 节点。
也可以以分布式模式运行 MirrorMaker 2.0。在分布式模式中,MirrorMaker 2.0 作为连接器在 Kafka Connect 集群中运行。Kafka 为数据复制提供 MirrorMaker 源连接器。如果要使用连接器而不是运行专用 MirrorMaker 集群,则必须在 Kafka Connect 集群中配置连接器。如需更多信息,请参阅 Apache Kafka 文档。
以前版本的 MirrorMaker 将继续被支持,方法是在 传统模式下运行 MirrorMaker 2.0。
配置必须指定:
- 每个 Kafka 集群
- 每个集群的连接信息,包括 TLS 身份验证
复制流和方向
- 集群到集群
- 主题
- 复制规则
- 提交偏移跟踪间隔
此流程描述了如何通过在属性文件中创建配置来实现 MirrorMaker 2.0,然后在使用 MirrorMaker 脚本文件设置连接时传递属性。
您可以指定您要从源集群复制的主题和消费者组。您可以指定源和目标集群的名称,然后指定要复制的主题和消费者组。
在以下示例中,为从集群 1 复制到 2 来指定主题和消费者组。
复制特定主题和消费者组的示例配置
clusters=cluster-1,cluster-2
cluster-1->cluster-2.topics = topic-1, topic-2
cluster-1->cluster-2.groups = group-1, group-2
您可以提供名称或使用正则表达式的列表。默认情况下,如果没有设置这些属性,则所有主题和消费者组都会被复制。您还可以使用 .* 作为正则表达式来复制所有主题和消费者组。但是,尝试只指定您需要指定主题和消费者组,以避免在集群中造成不必要的额外负载。
开始前
在 ./config/connect-mirror-maker.properties 中提供了示例配置属性文件。
先决条件
- 您需要在您复制的每个 Kafka 集群节点的主机上安装 AMQ Streams。
流程
在文本编辑器中打开示例属性文件,或创建一个新文件,并编辑该文件使其包含连接信息以及每个 Kafka 集群的复制流程。
以下示例显示了连接两个集群( cluster-1 和 cluster-2) 的配置,双向。集群名称可以通过 cluster 属性
进行配置。MirrorMaker 2.0 配置示例
clusters=cluster-1,cluster-21 cluster-1.bootstrap.servers=<cluster_name>-kafka-bootstrap-<project_name_one>:4432 cluster-1.security.protocol=SSL3 cluster-1.ssl.truststore.password=<truststore_name> cluster-1.ssl.truststore.location=<path_to_truststore>/truststore.cluster-1.jks_ cluster-1.ssl.keystore.password=<keystore_name> cluster-1.ssl.keystore.location=<path_to_keystore>/user.cluster-1.p12 cluster-2.bootstrap.servers=<cluster_name>-kafka-bootstrap-<project_name_two>:4434 cluster-2.security.protocol=SSL5 cluster-2.ssl.truststore.password=<truststore_name> cluster-2.ssl.truststore.location=<path_to_truststore>/truststore.cluster-2.jks_ cluster-2.ssl.keystore.password=<keystore_name> cluster-2.ssl.keystore.location=<path_to_keystore>/user.cluster-2.p12 cluster-1->cluster-2.enabled=true6 cluster-2->cluster-1.enabled=true7 cluster-1->cluster-2.topics=.*8 cluster-2->cluster-1.topics=topic-1, topic-29 cluster-1->cluster-2.groups=.*10 cluster-2->cluster-1.groups=group-1, group-211 replication.policy.separator=-12 sync.topic.acls.enabled=false13 refresh.topics.interval.seconds=6014 refresh.groups.interval.seconds=6015 - 1
- 每个 Kafka 集群都使用其别名标识。
- 2
- cluster-1 的连接信息,使用 bootstrap 地址和端口 443。两个集群都使用端口 443 连接到使用 OpenShift 路由 的 Kafka。
- 3
ssl.属性定义 cluster-1 的 TLS 配置。- 4
- cluster-2 的连接信息。
- 5
ssl.属性定义 cluster-2 的 TLS 配置。- 6
- 从 cluster-1 启用了复制流到 cluster-2。
- 7
- 从 cluster-2 启用到 cluster-1 的复制流程。
- 8
- 将 cluster-1 到 cluster-2 的所有主题复制。源连接器复制指定的主题。检查点连接器可跟踪指定主题的偏移。
- 9
- 将特定主题从 cluster-2 复制到 cluster-1。
- 10
- 将所有消费者组从 cluster-1 复制到 cluster-2。检查点连接器复制指定的消费者组。
- 11
- 将特定消费者组从 cluster-2 复制到 cluster-1。
- 12
- 定义用于重命名远程主题的分隔符。
- 13
- 启用后,ACL 会被应用到同步的主题。默认值为
false。 - 14
- 检查要同步的新主题之间的周期。
- 15
- 新消费者组同步的检查之间的周期。
选项:如果需要,添加一个可覆盖自动重命名远程主题的策略。该主题不会用源集群的名称来附加名称,而是保留其原始名称。
此可选设置用于主动/被动备份和数据迁移。
replication.policy.class=org.apache.kafka.connect.mirror.IdentityReplicationPolicy选项:如果要同步消费者组偏移,请添加配置来启用和管理同步:
refresh.groups.interval.seconds=60 sync.group.offsets.enabled=true1 sync.group.offsets.interval.seconds=602 emit.checkpoints.interval.seconds=603 在目标集群中启动 ZooKeeper 和 Kafka:
su - kafka /opt/kafka/bin/zookeeper-server-start.sh -daemon \ /opt/kafka/config/zookeeper.properties/opt/kafka/bin/kafka-server-start.sh -daemon \ /opt/kafka/config/server.properties使用您在属性文件中定义的集群连接配置和复制策略启动 MirrorMaker:
/opt/kafka/bin/connect-mirror-maker.sh \ /opt/kafka/config/connect-mirror-maker.propertiesMirrorMaker 设置集群之间的连接。
对于每个目标集群,验证主题是否被复制:
/opt/kafka/bin/kafka-topics.sh --bootstrap-server <broker_address> --list