9.6. 以专用模式运行 MirrorMaker 2
使用 MirrorMaker 2 通过配置在 Kafka 集群间同步数据。此流程演示了如何配置和运行专用的单节点 MirrorMaker 2 集群。专用集群使用 Kafka Connect worker 节点来镜像 Kafka 集群之间的数据。目前,专用模式的 MirrorMaker 2 仅适用于单个 worker 节点。
也可以以分布式模式运行 MirrorMaker 2。在分布式模式下,M MirrorMaker 2 作为连接器在 Kafka Connect 集群中运行。Kafka 为数据复制提供 MirrorMaker 源连接器。如果要使用连接器而不是运行专用 MirrorMaker 集群,则必须在 Kafka Connect 集群中配置连接器。如需更多信息,请参阅 Apache Kafka 文档。
以前版本的 MirrorMaker 继续支持,方法是在 传统模式下运行 MirrorMaker 2。
配置必须指定:
- 每个 Kafka 集群
- 每个集群的连接信息,包括 TLS 身份验证
复制流和方向
- 集群到集群
- topic to topic
- 复制规则
- 提交偏移跟踪间隔
此流程描述了如何通过在属性文件中创建配置来实现 MirrorMaker 2,然后在使用 MirrorMaker 脚本文件设置连接时传递属性。
您可以指定您要从源集群复制的主题和消费者组。您可以指定源和目标集群的名称,然后指定要复制的主题和消费者组。
在以下示例中,为从集群 1 复制到 2 而指定了主题和消费者组。
复制特定主题和消费者组的配置示例
clusters=cluster-1,cluster-2 cluster-1->cluster-2.topics = topic-1, topic-2 cluster-1->cluster-2.groups = group-1, group-2
clusters=cluster-1,cluster-2
cluster-1->cluster-2.topics = topic-1, topic-2
cluster-1->cluster-2.groups = group-1, group-2
您可以提供名称或使用正则表达式列表。默认情况下,如果您未设置这些属性,将复制所有主题和消费者组。您还可以使用 .* 作为正则表达式来复制所有主题和消费者组。但是,尝试只指定您需要指定主题和消费者组,以避免在集群中造成不必要的额外负载。
开始前
示例配置属性文件在 ./config/connect-mirror-maker.properties 中提供。
先决条件
- 您需要在您复制的每个 Kafka 集群节点上安装 AMQ Streams。
流程
在文本编辑器中打开示例属性文件,或创建一个新文件,并编辑该文件使其包含连接信息以及每个 Kafka 集群的复制流程。
以下示例显示了连接两个集群的配置,即 cluster-1 和 cluster-2,双向方式。集群名称可通过 cluster 属性
进行配置。MirrorMaker 2 配置示例
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - 1
- 每个 Kafka 集群都使用其别名来标识。
- 2
- 使用 bootstrap 地址和端口 443 连接 cluster-1 的连接信息。两个集群都使用端口 443 连接到使用 OpenShift Routes 的 Kafka。
- 3
ssl.属性定义 cluster-1 的 TLS 配置。- 4
- cluster-2 的连接信息。
- 5
ssl.属性定义 cluster-2 的 TLS 配置。- 6
- 从 cluster-1 启用的复制流到 cluster-2。
- 7
- 从 cluster-2 启用的复制流到 cluster-1。
- 8
- 将所有主题从 cluster-1 复制到 cluster-2。源连接器复制指定的主题。checkpoint 连接器跟踪指定主题的偏移。
- 9
- 将特定主题从 cluster-2 复制到 cluster-1。
- 10
- 将所有消费者组从 cluster-1 复制到 cluster-2。checkpoint 连接器复制指定的消费者组。
- 11
- 将特定消费者组从 cluster-2 复制到 cluster-1。
- 12
- 定义用于重命名远程主题的分隔符。
- 13
- 启用后,会应用 ACL 来同步主题。默认值为
false。 - 14
- 检查要同步的新主题之间的周期。
- 15
- 检查要同步的新消费者组之间的周期。
OPTION:如果需要,添加一个策略来覆盖远程主题的自动重命名。该主题不会用源集群的名称来附加名称,而是保留其原始名称。
此可选设置用于主动/被动备份和数据迁移。
replication.policy.class=org.apache.kafka.connect.mirror.IdentityReplicationPolicy
replication.policy.class=org.apache.kafka.connect.mirror.IdentityReplicationPolicyCopy to Clipboard Copied! Toggle word wrap Toggle overflow OPTION:如果要同步消费者组偏移,请添加配置来启用和管理同步:
refresh.groups.interval.seconds=60 sync.group.offsets.enabled=true sync.group.offsets.interval.seconds=60 emit.checkpoints.interval.seconds=60
refresh.groups.interval.seconds=60 sync.group.offsets.enabled=true1 sync.group.offsets.interval.seconds=602 emit.checkpoints.interval.seconds=603 Copy to Clipboard Copied! Toggle word wrap Toggle overflow 在目标集群中启动 ZooKeeper 和 Kafka:
su - kafka /opt/kafka/bin/zookeeper-server-start.sh -daemon \ /opt/kafka/config/zookeeper.properties
su - kafka /opt/kafka/bin/zookeeper-server-start.sh -daemon \ /opt/kafka/config/zookeeper.propertiesCopy to Clipboard Copied! Toggle word wrap Toggle overflow /opt/kafka/bin/kafka-server-start.sh -daemon \ /opt/kafka/config/server.properties
/opt/kafka/bin/kafka-server-start.sh -daemon \ /opt/kafka/config/server.propertiesCopy to Clipboard Copied! Toggle word wrap Toggle overflow 使用您在属性文件中定义的集群连接配置和复制策略启动 MirrorMaker:
/opt/kafka/bin/connect-mirror-maker.sh \ /opt/kafka/config/connect-mirror-maker.properties
/opt/kafka/bin/connect-mirror-maker.sh \ /opt/kafka/config/connect-mirror-maker.propertiesCopy to Clipboard Copied! Toggle word wrap Toggle overflow MirrorMaker 在集群之间设置连接。
对于每个目标集群,验证主题是否被复制:
/opt/kafka/bin/kafka-topics.sh --bootstrap-server <broker_address> --list
/opt/kafka/bin/kafka-topics.sh --bootstrap-server <broker_address> --listCopy to Clipboard Copied! Toggle word wrap Toggle overflow