6.3. 扩展集群


6.3.1. 扩展 Kafka 集群

6.3.1.1. 在集群中添加代理

增加主题吞吐量的主要方法是增加该主题的分区数。这可以正常工作,因为分区允许在集群中的代理之间共享该主题的负载。当某些资源(通常是 I/O)的限制代理时,使用更多分区不会增加吞吐量。反之,您必须在集群中添加代理。

当您向集群添加额外的代理时,AMQ Streams 不会自动将任何分区分配给它。您必须决定将哪些分区从现有代理移到新代理中。

在所有代理间重新分发分区后,每个代理都应该有较低的资源使用率。

6.3.1.2. 从集群中删除代理

从集群中删除代理前,您必须确保它没有被分配给任何分区。您应该决定哪些剩余的代理将负责代理中的每个分区被停用。代理没有分配的分区后,您可以停止它。

6.3.2. 重新分配分区

kafka-reassign-partitions.sh 实用程序用于将分区重新分配给不同的代理。

它有三种不同的模式:

--generate
取一组主题和代理,并生成 重新分配 JSON 文件,这会导致将这些主题的分区分配给这些代理。生成 重新分配 JSON 文件 的一种简单方法是,但它对整个主题进行操作,因此其使用并不始终合适。
--execute
取一个 重新分配 JSON 文件,并将其应用到集群中的分区和代理。获得分区的代理将遵循分区领导机。对于给定分区,当新代理发现并加入 ISR 后,旧代理将停止为后续,并删除其副本。
--verify
使用与 --execute 步骤相同的重新分配 JSON 文件--verify 会检查文件中所有分区是否已移至其预期的代理中。如果重新分配完成,它将删除任何无效 节流。除非被删除,throttles 将继续影响集群,即使重新分配完成后也是如此。

在任何给定时间,集群中只能有一个重新分配运行,且无法取消正在运行的重新分配。如果您需要取消重新分配,则必须等待它完成,然后执行另一个重新分配来恢复第一个重新分配的影响。kafka-reassign-partitions.sh 将打印这个 reversion 的重新分配 JSON 作为其输出的一部分。非常大的重新分配应该被分解为多个较小的重新分配,以防需要停止 in-progress 重新分配。

6.3.2.1. 重新分配 JSON 文件

重新分配 JSON 文件具有特定的结构:

{
  "version": 1,
  "partitions": [
    <PartitionObjects>
  ]
}
Copy to Clipboard Toggle word wrap

其中 <PartitionObjects > 是以逗号分隔的对象列表,如下所示:

{
  "topic": <TopicName>,
  "partition": <Partition>,
  "replicas": [ <AssignedBrokerIds> ],
  "log_dirs": [<LogDirs>]
}
Copy to Clipboard Toggle word wrap

"log_dirs" 属性是可选的,用于将分区移到特定的日志目录中。

以下是一个示例重新分配 JSON 文件,它将主题 topic-a, 分区 4 分配给代理 2, 47;主题 topic-b 的分区 2 分配给代理 1, 57

{
  "version": 1,
  "partitions": [
    {
      "topic": "topic-a",
      "partition": 4,
      "replicas": [2,4,7]
    },
    {
      "topic": "topic-b",
      "partition": 2,
      "replicas": [1,5,7]
    }
  ]
}
Copy to Clipboard Toggle word wrap

不包含在 JSON 中的分区不会改变。

6.3.2.2. 生成重新分配 JSON 文件

为给定一组代理分配给定主题的所有分区的最简单方法是使用 kafka-reassign-partitions.sh --generate 命令生成一个重新分配 JSON 文件。

bin/kafka-reassign-partitions.sh --zookeeper <ZooKeeper> --topics-to-move-json-file <TopicsFile> --broker-list <BrokerList> --generate
Copy to Clipboard Toggle word wrap

& lt;TopicsFile > 是一个 JSON 文件,它列出了要移动的主题。它具有以下结构:

{
  "version": 1,
  "topics": [
    <TopicObjects>
  ]
}
Copy to Clipboard Toggle word wrap

其中 <TopicObjects > 是以逗号分隔的对象列表,如下所示:

{
  "topic": <TopicName>
}
Copy to Clipboard Toggle word wrap

例如,要将 topic-atopic-b 的所有分区移动到代理 47

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-be-moved.json --broker-list 4,7 --generate
Copy to Clipboard Toggle word wrap

where topics-to-be-moved.json has contents:

{
  "version": 1,
  "topics": [
    { "topic": "topic-a"},
    { "topic": "topic-b"}
  ]
}
Copy to Clipboard Toggle word wrap

6.3.2.3. 手动创建重新分配 JSON 文件

如果要移动特定分区,您可以手动创建重新分配 JSON 文件。

6.3.3. 重新分配节流

重新分配分区可能会是一个较慢的过程,因为它可能需要在代理间移动大量数据。为了避免这会对客户端产生不利影响,可以限制重新分配。使用节流意味着重新分配需要更长的时间。如果节流过低,则新分配的代理将无法与发布记录保持同步,且重新分配永远不会完成。如果节流过高,客户端将会受到影响。例如,对于制作者,这可能比正常延迟高,等待确认。对于消费者,这可能因为轮询之间延迟更高的吞吐量导致的吞吐量下降。

6.3.4. 扩展 Kafka 集群

这个步骤描述了如何增加 Kafka 集群中的代理数量。

先决条件

  • 现有的 Kafka 集群。
  • 安装了 AMQ 代理的新机器。
  • 重新分配 JSON 文件,了解如何将分区重新分配给放大集群中的代理。

流程

  1. 使用与集群中其他代理相同的设置为新代理创建配置文件,但 broker.id 除外,它应该是未被任何其他代理使用的编号。
  2. 启动新的 Kafka 代理,将您在上一步中创建的配置文件作为 kafka-server-start.sh 脚本的参数:

    su - kafka
    /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
    Copy to Clipboard Toggle word wrap
  3. 验证 Kafka 代理是否正在运行。

    jcmd | grep Kafka
    Copy to Clipboard Toggle word wrap
  4. 为每个新代理重复上述步骤。
  5. 使用 kafka-reassign-partitions.sh 命令行工具执行分区重新分配。

    kafka-reassign-partitions.sh --zookeeper <ZooKeeperHostAndPort> --reassignment-json-file <ReassignmentJsonFile> --execute
    Copy to Clipboard Toggle word wrap

    如果要节流复制,您还可以通过 --throttle 选项,并传递一个 inter-broker 节流率(以字节/秒为单位)。例如:

    kafka-reassign-partitions.sh --zookeeper zookeeper1:2181 --reassignment-json-file reassignment.json --throttle 5000000 --execute
    Copy to Clipboard Toggle word wrap

    此命令将输出两个重新分配 JSON 对象。第一个记录正在移动的分区的当前分配。如果需要稍后恢复重新分配,您应该将其保存到文件中。第二个 JSON 对象是在重新分配 JSON 文件中传递的目标重新分配。

  6. 如果您需要在重新分配过程中更改节流,您可以使用带有不同节流率相同的命令行。例如:

    kafka-reassign-partitions.sh --zookeeper zookeeper1:2181 --reassignment-json-file reassignment.json --throttle 10000000 --execute
    Copy to Clipboard Toggle word wrap
  7. 使用 kafka-reassign-partitions.sh 命令行工具定期验证重新分配是否已完成。这与上一步的命令相同,但使用 --verify 选项而不是 --execute 选项。

    kafka-reassign-partitions.sh --zookeeper <ZooKeeperHostAndPort> --reassignment-json-file <ReassignmentJsonFile> --verify
    Copy to Clipboard Toggle word wrap

    例如:

    kafka-reassign-partitions.sh --zookeeper zookeeper1:2181 --reassignment-json-file reassignment.json --verify
    Copy to Clipboard Toggle word wrap
  8. --verify 命令报告以成功完成方式移动的每个分区时,重新分配已完成。最后 --verify 也具有删除任何重新分配节流的影响。现在,如果您保存了 JSON 以将分配恢复到其原始代理,您可以删除恢复的文件。

6.3.5. 缩减 Kafka 集群

其他资源

这个步骤描述了如何减少 Kafka 集群中的代理数量。

先决条件

  • 现有的 Kafka 集群。
  • 重新分配 JSON 文件,在代理被删除后,如何将分区重新分配给集群中的代理。

流程

  1. 使用 kafka-reassign-partitions.sh 命令行工具执行分区重新分配。

    kafka-reassign-partitions.sh --zookeeper <ZooKeeperHostAndPort> --reassignment-json-file <ReassignmentJsonFile> --execute
    Copy to Clipboard Toggle word wrap

    如果要节流复制,您还可以通过 --throttle 选项,并传递一个 inter-broker 节流率(以字节/秒为单位)。例如:

    kafka-reassign-partitions.sh --zookeeper zookeeper1:2181 --reassignment-json-file reassignment.json --throttle 5000000 --execute
    Copy to Clipboard Toggle word wrap

    此命令将输出两个重新分配 JSON 对象。第一个记录正在移动的分区的当前分配。如果需要稍后恢复重新分配,您应该将其保存到文件中。第二个 JSON 对象是在重新分配 JSON 文件中传递的目标重新分配。

  2. 如果您需要在重新分配过程中更改节流,您可以使用带有不同节流率相同的命令行。例如:

    kafka-reassign-partitions.sh --zookeeper zookeeper1:2181 --reassignment-json-file reassignment.json --throttle 10000000 --execute
    Copy to Clipboard Toggle word wrap
  3. 使用 kafka-reassign-partitions.sh 命令行工具定期验证重新分配是否已完成。这与上一步的命令相同,但使用 --verify 选项而不是 --execute 选项。

    kafka-reassign-partitions.sh --zookeeper <ZooKeeperHostAndPort> --reassignment-json-file <ReassignmentJsonFile> --verify
    Copy to Clipboard Toggle word wrap

    例如:

    kafka-reassign-partitions.sh --zookeeper zookeeper1:2181 --reassignment-json-file reassignment.json --verify
    Copy to Clipboard Toggle word wrap
  4. --verify 命令报告以成功完成方式移动的每个分区时,重新分配已完成。最后 --verify 也具有删除任何重新分配节流的影响。现在,如果您保存了 JSON 以将分配恢复到其原始代理,您可以删除恢复的文件。
  5. 所有分区重新分配完成后,会删除代理应该不会对集群中的任何分区负责。您可以通过检查代理的 log.dirs 配置参数中提供的每个目录来验证这一点。如果代理中的任何日志目录包含一个与扩展正则表达式 \.[a-z0-9]-delete$ 不匹配的目录,则代理仍具有实时分区,不应停止。

    您可以通过执行以下命令来检查它:

    ls -l <LogDir> | grep -E '^d' | grep -vE '[a-zA-Z0-9.-]+\.[a-z0-9]+-delete$'
    Copy to Clipboard Toggle word wrap

    如果上述命令打印任何输出,则代理仍有实时分区。在这种情况下,重新分配还没有完成,或者重新分配 JSON 文件不正确。

  6. 确认代理没有实时分区后,就可以停止它。

    su - kafka
    /opt/kafka/bin/kafka-server-stop.sh
    Copy to Clipboard Toggle word wrap
  7. 确认 Kafka 代理已停止。

    jcmd | grep kafka
    Copy to Clipboard Toggle word wrap

6.3.6. 扩展 ZooKeeper 集群

这个步骤描述了如何在 ZooKeeper 集群中添加服务器(节点)。ZooKeeper 的动态重新配置 功能在扩展过程中维护稳定的 ZooKeeper 集群。

先决条件

  • ZooKeeper 配置文件中启用了动态重新配置(reconfigEnabled=true)。
  • 启用了 ZooKeeper 身份验证,您可以使用身份验证机制访问新的服务器。

流程

对每个 ZooKeeper 服务器执行以下步骤,一次:

  1. 在 ZooKeeper 集群中添加服务器,如 第 3.3 节 “运行多节点 ZooKeeper 集群” 所述,然后启动 ZooKeeper。
  2. 请注意新服务器的 IP 地址和配置的访问端口。
  3. 为服务器启动 zookeeper-shell 会话。从可以访问集群的机器中运行以下命令(如果可以访问集群,这可能是 ZooKeeper 节点或本地机器之一)。

    su - kafka
    /opt/kafka/bin/zookeeper-shell.sh <ip-address>:<zk-port>
    Copy to Clipboard Toggle word wrap
  4. 在 shell 会话中,运行 ZooKeeper 节点,输入以下行将新服务器作为投票成员添加到仲裁中:

    reconfig -add server.<positive-id> = <address1>:<port1>:<port2>[:role];[<client-port-address>:]<client-port>
    Copy to Clipboard Toggle word wrap

    例如:

    reconfig -add server.4=172.17.0.4:2888:3888:participant;172.17.0.4:2181
    Copy to Clipboard Toggle word wrap

    其中 <positive-id> 是新服务器 ID 4

    对于两个端口,<port1> 2888 用于 ZooKeeper 服务器之间的通信,<port2> 3888 用于领导选举机制。

    新配置传播到 ZooKeeper 集群中的其他服务器 ; 新服务器现在是仲裁的完整成员。

  5. 对您要添加的其他服务器重复步骤 1-4。

6.3.7. 缩减 ZooKeeper 集群

这个步骤描述了如何从 ZooKeeper 集群中删除服务器(节点)。ZooKeeper 的动态重新配置 功能在缩减过程中维护稳定的 ZooKeeper 集群。

先决条件

  • ZooKeeper 配置文件中启用了动态重新配置(reconfigEnabled=true)。
  • 启用了 ZooKeeper 身份验证,您可以使用身份验证机制访问新的服务器。

流程

对每个 ZooKeeper 服务器执行以下步骤,一次:

  1. 登录到在缩减后会保留的其中一个服务器上的 zookeeper-shell(例如,服务器 1)。

    注意

    使用为 ZooKeeper 集群配置的验证机制访问服务器。

  2. 删除服务器,如 server 5。

    reconfig -remove 5
    Copy to Clipboard Toggle word wrap
  3. 取消激活您删除的服务器。
  4. 重复步骤 1-3 以减小集群大小。
返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat