6.3. 扩展集群
6.3.1. 扩展 Kafka 集群 复制链接链接已复制到粘贴板!
6.3.1.1. 在集群中添加代理 复制链接链接已复制到粘贴板!
增加主题吞吐量的主要方法是增加该主题的分区数。这可以正常工作,因为分区允许在集群中的代理之间共享该主题的负载。当某些资源(通常是 I/O)的限制代理时,使用更多分区不会增加吞吐量。反之,您必须在集群中添加代理。
当您向集群添加额外的代理时,AMQ Streams 不会自动将任何分区分配给它。您必须决定将哪些分区从现有代理移到新代理中。
在所有代理间重新分发分区后,每个代理都应该有较低的资源使用率。
6.3.1.2. 从集群中删除代理 复制链接链接已复制到粘贴板!
从集群中删除代理前,您必须确保它没有被分配给任何分区。您应该决定哪些剩余的代理将负责代理中的每个分区被停用。代理没有分配的分区后,您可以停止它。
6.3.2. 重新分配分区 复制链接链接已复制到粘贴板!
kafka-reassign-partitions.sh
实用程序用于将分区重新分配给不同的代理。
它有三种不同的模式:
--generate
- 取一组主题和代理,并生成 重新分配 JSON 文件,这会导致将这些主题的分区分配给这些代理。生成 重新分配 JSON 文件 的一种简单方法是,但它对整个主题进行操作,因此其使用并不始终合适。
--execute
- 取一个 重新分配 JSON 文件,并将其应用到集群中的分区和代理。获得分区的代理将遵循分区领导机。对于给定分区,当新代理发现并加入 ISR 后,旧代理将停止为后续,并删除其副本。
--verify
-
使用与
--execute
步骤相同的重新分配 JSON 文件,--verify
会检查文件中所有分区是否已移至其预期的代理中。如果重新分配完成,它将删除任何无效 节流。除非被删除,throttles 将继续影响集群,即使重新分配完成后也是如此。
在任何给定时间,集群中只能有一个重新分配运行,且无法取消正在运行的重新分配。如果您需要取消重新分配,则必须等待它完成,然后执行另一个重新分配来恢复第一个重新分配的影响。kafka-reassign-partitions.sh
将打印这个 reversion 的重新分配 JSON 作为其输出的一部分。非常大的重新分配应该被分解为多个较小的重新分配,以防需要停止 in-progress 重新分配。
6.3.2.1. 重新分配 JSON 文件 复制链接链接已复制到粘贴板!
重新分配 JSON 文件具有特定的结构:
其中 <PartitionObjects > 是以逗号分隔的对象列表,如下所示:
"log_dirs"
属性是可选的,用于将分区移到特定的日志目录中。
以下是一个示例重新分配 JSON 文件,它将主题 topic-a
, 分区 4
分配给代理 2
, 4
和 7
;主题 topic-b
的分区 2
分配给代理 1
, 5
和 7
:
不包含在 JSON 中的分区不会改变。
6.3.2.2. 生成重新分配 JSON 文件 复制链接链接已复制到粘贴板!
为给定一组代理分配给定主题的所有分区的最简单方法是使用 kafka-reassign-partitions.sh --generate
命令生成一个重新分配 JSON 文件。
bin/kafka-reassign-partitions.sh --zookeeper <ZooKeeper> --topics-to-move-json-file <TopicsFile> --broker-list <BrokerList> --generate
bin/kafka-reassign-partitions.sh --zookeeper <ZooKeeper> --topics-to-move-json-file <TopicsFile> --broker-list <BrokerList> --generate
& lt;TopicsFile
> 是一个 JSON 文件,它列出了要移动的主题。它具有以下结构:
其中 <TopicObjects > 是以逗号分隔的对象列表,如下所示:
{ "topic": <TopicName> }
{
"topic": <TopicName>
}
例如,要将 topic-a
和 topic-b
的所有分区移动到代理 4
和 7
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-be-moved.json --broker-list 4,7 --generate
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-be-moved.json --broker-list 4,7 --generate
where topics-to-be-moved.json
has contents:
6.3.2.3. 手动创建重新分配 JSON 文件 复制链接链接已复制到粘贴板!
如果要移动特定分区,您可以手动创建重新分配 JSON 文件。
6.3.3. 重新分配节流 复制链接链接已复制到粘贴板!
重新分配分区可能会是一个较慢的过程,因为它可能需要在代理间移动大量数据。为了避免这会对客户端产生不利影响,可以限制重新分配。使用节流意味着重新分配需要更长的时间。如果节流过低,则新分配的代理将无法与发布记录保持同步,且重新分配永远不会完成。如果节流过高,客户端将会受到影响。例如,对于制作者,这可能比正常延迟高,等待确认。对于消费者,这可能因为轮询之间延迟更高的吞吐量导致的吞吐量下降。
6.3.4. 扩展 Kafka 集群 复制链接链接已复制到粘贴板!
这个步骤描述了如何增加 Kafka 集群中的代理数量。
先决条件
- 现有的 Kafka 集群。
- 安装了 AMQ 代理的新机器。
- 重新分配 JSON 文件,了解如何将分区重新分配给放大集群中的代理。
流程
-
使用与集群中其他代理相同的设置为新代理创建配置文件,但
broker.id
除外,它应该是未被任何其他代理使用的编号。 启动新的 Kafka 代理,将您在上一步中创建的配置文件作为
kafka-server-start.sh
脚本的参数:su - kafka /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
su - kafka /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 验证 Kafka 代理是否正在运行。
jcmd | grep Kafka
jcmd | grep Kafka
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - 为每个新代理重复上述步骤。
使用
kafka-reassign-partitions.sh
命令行工具执行分区重新分配。kafka-reassign-partitions.sh --zookeeper <ZooKeeperHostAndPort> --reassignment-json-file <ReassignmentJsonFile> --execute
kafka-reassign-partitions.sh --zookeeper <ZooKeeperHostAndPort> --reassignment-json-file <ReassignmentJsonFile> --execute
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 如果要节流复制,您还可以通过
--throttle
选项,并传递一个 inter-broker 节流率(以字节/秒为单位)。例如:kafka-reassign-partitions.sh --zookeeper zookeeper1:2181 --reassignment-json-file reassignment.json --throttle 5000000 --execute
kafka-reassign-partitions.sh --zookeeper zookeeper1:2181 --reassignment-json-file reassignment.json --throttle 5000000 --execute
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 此命令将输出两个重新分配 JSON 对象。第一个记录正在移动的分区的当前分配。如果需要稍后恢复重新分配,您应该将其保存到文件中。第二个 JSON 对象是在重新分配 JSON 文件中传递的目标重新分配。
如果您需要在重新分配过程中更改节流,您可以使用带有不同节流率相同的命令行。例如:
kafka-reassign-partitions.sh --zookeeper zookeeper1:2181 --reassignment-json-file reassignment.json --throttle 10000000 --execute
kafka-reassign-partitions.sh --zookeeper zookeeper1:2181 --reassignment-json-file reassignment.json --throttle 10000000 --execute
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 使用
kafka-reassign-partitions.sh
命令行工具定期验证重新分配是否已完成。这与上一步的命令相同,但使用--verify
选项而不是--execute
选项。kafka-reassign-partitions.sh --zookeeper <ZooKeeperHostAndPort> --reassignment-json-file <ReassignmentJsonFile> --verify
kafka-reassign-partitions.sh --zookeeper <ZooKeeperHostAndPort> --reassignment-json-file <ReassignmentJsonFile> --verify
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 例如:
kafka-reassign-partitions.sh --zookeeper zookeeper1:2181 --reassignment-json-file reassignment.json --verify
kafka-reassign-partitions.sh --zookeeper zookeeper1:2181 --reassignment-json-file reassignment.json --verify
Copy to Clipboard Copied! Toggle word wrap Toggle overflow -
当
--verify
命令报告以成功完成方式移动的每个分区时,重新分配已完成。最后--verify
也具有删除任何重新分配节流的影响。现在,如果您保存了 JSON 以将分配恢复到其原始代理,您可以删除恢复的文件。
6.3.5. 缩减 Kafka 集群 复制链接链接已复制到粘贴板!
其他资源
这个步骤描述了如何减少 Kafka 集群中的代理数量。
先决条件
- 现有的 Kafka 集群。
- 重新分配 JSON 文件,在代理被删除后,如何将分区重新分配给集群中的代理。
流程
使用
kafka-reassign-partitions.sh
命令行工具执行分区重新分配。kafka-reassign-partitions.sh --zookeeper <ZooKeeperHostAndPort> --reassignment-json-file <ReassignmentJsonFile> --execute
kafka-reassign-partitions.sh --zookeeper <ZooKeeperHostAndPort> --reassignment-json-file <ReassignmentJsonFile> --execute
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 如果要节流复制,您还可以通过
--throttle
选项,并传递一个 inter-broker 节流率(以字节/秒为单位)。例如:kafka-reassign-partitions.sh --zookeeper zookeeper1:2181 --reassignment-json-file reassignment.json --throttle 5000000 --execute
kafka-reassign-partitions.sh --zookeeper zookeeper1:2181 --reassignment-json-file reassignment.json --throttle 5000000 --execute
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 此命令将输出两个重新分配 JSON 对象。第一个记录正在移动的分区的当前分配。如果需要稍后恢复重新分配,您应该将其保存到文件中。第二个 JSON 对象是在重新分配 JSON 文件中传递的目标重新分配。
如果您需要在重新分配过程中更改节流,您可以使用带有不同节流率相同的命令行。例如:
kafka-reassign-partitions.sh --zookeeper zookeeper1:2181 --reassignment-json-file reassignment.json --throttle 10000000 --execute
kafka-reassign-partitions.sh --zookeeper zookeeper1:2181 --reassignment-json-file reassignment.json --throttle 10000000 --execute
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 使用
kafka-reassign-partitions.sh
命令行工具定期验证重新分配是否已完成。这与上一步的命令相同,但使用--verify
选项而不是--execute
选项。kafka-reassign-partitions.sh --zookeeper <ZooKeeperHostAndPort> --reassignment-json-file <ReassignmentJsonFile> --verify
kafka-reassign-partitions.sh --zookeeper <ZooKeeperHostAndPort> --reassignment-json-file <ReassignmentJsonFile> --verify
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 例如:
kafka-reassign-partitions.sh --zookeeper zookeeper1:2181 --reassignment-json-file reassignment.json --verify
kafka-reassign-partitions.sh --zookeeper zookeeper1:2181 --reassignment-json-file reassignment.json --verify
Copy to Clipboard Copied! Toggle word wrap Toggle overflow -
当
--verify
命令报告以成功完成方式移动的每个分区时,重新分配已完成。最后--verify
也具有删除任何重新分配节流的影响。现在,如果您保存了 JSON 以将分配恢复到其原始代理,您可以删除恢复的文件。 所有分区重新分配完成后,会删除代理应该不会对集群中的任何分区负责。您可以通过检查代理的
log.dirs
配置参数中提供的每个目录来验证这一点。如果代理中的任何日志目录包含一个与扩展正则表达式\.[a-z0-9]-delete$
不匹配的目录,则代理仍具有实时分区,不应停止。您可以通过执行以下命令来检查它:
ls -l <LogDir> | grep -E '^d' | grep -vE '[a-zA-Z0-9.-]+\.[a-z0-9]+-delete$'
ls -l <LogDir> | grep -E '^d' | grep -vE '[a-zA-Z0-9.-]+\.[a-z0-9]+-delete$'
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 如果上述命令打印任何输出,则代理仍有实时分区。在这种情况下,重新分配还没有完成,或者重新分配 JSON 文件不正确。
确认代理没有实时分区后,就可以停止它。
su - kafka /opt/kafka/bin/kafka-server-stop.sh
su - kafka /opt/kafka/bin/kafka-server-stop.sh
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 确认 Kafka 代理已停止。
jcmd | grep kafka
jcmd | grep kafka
Copy to Clipboard Copied! Toggle word wrap Toggle overflow
6.3.6. 扩展 ZooKeeper 集群 复制链接链接已复制到粘贴板!
这个步骤描述了如何在 ZooKeeper 集群中添加服务器(节点)。ZooKeeper 的动态重新配置 功能在扩展过程中维护稳定的 ZooKeeper 集群。
先决条件
-
ZooKeeper 配置文件中启用了动态重新配置(
reconfigEnabled=true
)。 - 启用了 ZooKeeper 身份验证,您可以使用身份验证机制访问新的服务器。
流程
对每个 ZooKeeper 服务器执行以下步骤,一次:
- 在 ZooKeeper 集群中添加服务器,如 第 3.3 节 “运行多节点 ZooKeeper 集群” 所述,然后启动 ZooKeeper。
- 请注意新服务器的 IP 地址和配置的访问端口。
为服务器启动
zookeeper-shell
会话。从可以访问集群的机器中运行以下命令(如果可以访问集群,这可能是 ZooKeeper 节点或本地机器之一)。su - kafka /opt/kafka/bin/zookeeper-shell.sh <ip-address>:<zk-port>
su - kafka /opt/kafka/bin/zookeeper-shell.sh <ip-address>:<zk-port>
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 在 shell 会话中,运行 ZooKeeper 节点,输入以下行将新服务器作为投票成员添加到仲裁中:
reconfig -add server.<positive-id> = <address1>:<port1>:<port2>[:role];[<client-port-address>:]<client-port>
reconfig -add server.<positive-id> = <address1>:<port1>:<port2>[:role];[<client-port-address>:]<client-port>
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 例如:
reconfig -add server.4=172.17.0.4:2888:3888:participant;172.17.0.4:2181
reconfig -add server.4=172.17.0.4:2888:3888:participant;172.17.0.4:2181
Copy to Clipboard Copied! Toggle word wrap Toggle overflow 其中
<positive-id>
是新服务器 ID4
。对于两个端口,
<port1>
2888 用于 ZooKeeper 服务器之间的通信,<port2>
3888 用于领导选举机制。新配置传播到 ZooKeeper 集群中的其他服务器 ; 新服务器现在是仲裁的完整成员。
- 对您要添加的其他服务器重复步骤 1-4。
6.3.7. 缩减 ZooKeeper 集群 复制链接链接已复制到粘贴板!
这个步骤描述了如何从 ZooKeeper 集群中删除服务器(节点)。ZooKeeper 的动态重新配置 功能在缩减过程中维护稳定的 ZooKeeper 集群。
先决条件
-
ZooKeeper 配置文件中启用了动态重新配置(
reconfigEnabled=true
)。 - 启用了 ZooKeeper 身份验证,您可以使用身份验证机制访问新的服务器。
流程
对每个 ZooKeeper 服务器执行以下步骤,一次:
登录到在缩减后会保留的其中一个服务器上的
zookeeper-shell
(例如,服务器 1)。注意使用为 ZooKeeper 集群配置的验证机制访问服务器。
删除服务器,如 server 5。
reconfig -remove 5
reconfig -remove 5
Copy to Clipboard Copied! Toggle word wrap Toggle overflow - 取消激活您删除的服务器。
- 重复步骤 1-3 以减小集群大小。