10.2. 扩展 Kafka 集群
您可以从 Kafka 集群中添加或删除代理。您还可以从 ZooKeeper 集群中添加或删除节点。
当您添加或删除代理时,您可以使用 kafka-reassign-partitions.sh 分配分区。
您还可以在重新平衡 Kafka 集群时,使用 Cruise Control 合并对代理数量的更改。您可以安装新的代理,并在重新平衡中包括它们。您可以执行从重新平衡中排除代理的重新平衡,然后再删除它们。更多信息请参阅 第 14 章 使用 Cruise Control 进行集群重新平衡。
10.2.1. 从 Kafka 集群中添加和删除代理 复制链接链接已复制到粘贴板!
增加主题吞吐量的主要方法是增加该主题的分区数。分区允许在集群中的代理之间共享该主题的负载。当某个资源(通常是 I/O)的限制代理时,使用更多分区将无法增加吞吐量。反之,您必须在集群中添加代理。
当您向集群添加额外的代理时,AMQ Streams 不会自动为其分配任何分区。您必须决定将哪些分区从现有代理移到新代理中。在所有代理间重新分发分区后,每个代理都应降低资源利用率。
在从集群中删除代理前,您必须确保它没有被分配给任何分区。您应该决定哪些剩余的代理将负责代理中的每个分区被停用。当代理没有分配分区后,您可以停止它。
10.2.2. 重新分配分区 复制链接链接已复制到粘贴板!
kafka-reassign-partitions.sh 程序用来将分区重新分配给不同的代理。
它有三个不同的模式:
--generate- 取一组主题和代理并生成 重新分配 JSON 文件,这将导致这些主题的分区被分配给这些代理。生成重新分配 JSON 文件的一种简便方法,但它对整个主题运行,因此不能始终使用。
--execute- 请求一个 重新分配 JSON 文件,并将其应用到集群中的分区和代理。获得分区的代理将成为分区领导人。对于给定分区,当新代理发现并加入 ISR 后,旧代理就停止成为后续程序,并将删除其副本。
--verify-
使用与
--execute步骤相同的重新分配 JSON 文件,--verify会检查文件中所有分区是否已移至其预期的代理中。如果重新分配完成,它将删除任何有效的 节流。除非被删除,否则节流仍然会影响到集群,即使重新分配完成后也是如此。
在任何给定时间,集群中只能运行一个重新分配功能,且无法取消对正在运行的重新分配。如果您需要取消重新分配,需要等待它完成,然后执行另一个重新分配来恢复第一个的影响。kafka-reassign-partitions.sh 将打印这个 reversion 的重新分配 JSON 作为其输出的一部分。非常大的重新分配应该被分解为多个较小的重新分配,以防需要停止 in-progress 重新分配。
10.2.2.1. 重新分配 JSON 文件 复制链接链接已复制到粘贴板!
重新分配 JSON 文件 具有特定结构:
其中 <PartitionObjects > 是一个以逗号分隔的对象列表,如下所示:
"log_dirs" 属性是可选的,用于将分区移到特定的日志目录中。
以下是一个示例重新分配 JSON 文件,它将主题 topic-a, 分区 4 分配给代理 2, 4 和 7;主题 topic-b 的分区 2 分配给代理 1, 5 和 7:
JSON 中没有包括的分区不会改变。
10.2.2.2. 生成重新分配 JSON 文件 复制链接链接已复制到粘贴板!
将给定组主题的所有分区分配给给定代理集的最简单方法是使用 kafka-reassign-partitions.sh --generate 命令生成重新分配 JSON 文件。
/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server <bootstrap_address> --topics-to-move-json-file <topics_file> --broker-list <broker_list> --generate
/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server <bootstrap_address> --topics-to-move-json-file <topics_file> --broker-list <broker_list> --generate
& lt;topics_file > 是一个 JSON 文件,列出了要移动的主题。它具有以下结构:
其中 <topic_objects > 是一个以逗号分隔的对象列表,如下所示:
{
"topic": <TopicName>
}
{
"topic": <TopicName>
}
例如,要将 topic-a 和 topic-b 的所有分区移动到代理 4 和 7
/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --topics-to-move-json-file topics-to-be-moved.json --broker-list 4,7 --generate
/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --topics-to-move-json-file topics-to-be-moved.json --broker-list 4,7 --generate
where topics-to-be-moved.json has contents:
10.2.2.3. 手动创建重新分配 JSON 文件 复制链接链接已复制到粘贴板!
如果要移动特定分区,可以手动创建重新分配 JSON 文件。
10.2.3. rement throttles 复制链接链接已复制到粘贴板!
重新分配分区的过程可能很慢,因为它可能需要在代理间移动大量数据。为了避免对客户端造成不利影响,可以限制 重新分配。使用节流可能意味着重新分配时间较长。如果节流过低,则新分配的代理将无法与发布记录保持同步,且重新分配永远不会完成。如果节流过高,则客户端将会受到影响。例如,对于制作者,这可能会以高于正常延迟形式的清单,等待确认。对于消费者,这可能会作为延迟在轮询之间导致的吞吐量下降。
10.2.4. 扩展 Kafka 集群 复制链接链接已复制到粘贴板!
这个步骤描述了如何增加 Kafka 集群中的代理数量。
先决条件
- 现有 Kafka 集群。
- 安装了 AMQ 代理的新机器。
- 重新分配 分区的 JSON 文件,如何将分区重新分配给 enlarged 集群中的代理。
流程
-
使用与集群中其他代理相同的设置为新代理创建配置文件,但
broker.id除外,它应该是未被任何其他代理使用的编号。 启动新的 Kafka 代理,将上一步中创建的配置文件作为
kafka-server-start.sh脚本的参数传递:su - kafka /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
su - kafka /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.propertiesCopy to Clipboard Copied! Toggle word wrap Toggle overflow 验证 Kafka 代理是否正在运行。
jcmd | grep Kafka
jcmd | grep KafkaCopy to Clipboard Copied! Toggle word wrap Toggle overflow - 为每个新代理重复上述步骤。
使用
kafka-reassign-partitions.sh命令行工具运行分区重新分配。/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server <bootstrap_address> --reassignment-json-file <reassignment_json_file> --execute
/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server <bootstrap_address> --reassignment-json-file <reassignment_json_file> --executeCopy to Clipboard Copied! Toggle word wrap Toggle overflow 如果要节流复制,您也可以传递
--throttle选项,并用每秒字节数(以字节/秒为单位)传递 --throttle 选项。例如:/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassignment.json --throttle 5000000 --execute
/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassignment.json --throttle 5000000 --executeCopy to Clipboard Copied! Toggle word wrap Toggle overflow 此命令将打印出两个重新分配 JSON 对象。第一个记录移动分区的当前分配。如果稍后需要恢复重新分配,您应将它保存到文件中。第二个 JSON 对象是您在重新分配 JSON 文件中传递的目标重新分配。
如果您需要在重新分配过程中更改节流,您可以使用带有不同节流率相同的命令行。例如:
/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassignment.json --throttle 10000000 --execute
/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassignment.json --throttle 10000000 --executeCopy to Clipboard Copied! Toggle word wrap Toggle overflow 使用
kafka-reassign-partitions.sh命令行工具定期验证重新分配是否完成。这与上一步中的命令相同,但使用--verify选项而不是--execute选项。/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server <bootstrap_address> --reassignment-json-file <reassignment_json_file> --verify
/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server <bootstrap_address> --reassignment-json-file <reassignment_json_file> --verifyCopy to Clipboard Copied! Toggle word wrap Toggle overflow 例如:
/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassignment.json --verify
/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassignment.json --verifyCopy to Clipboard Copied! Toggle word wrap Toggle overflow -
当
--verify命令报告每个分区被成功完成时,重新分配已完成。最后,--verify也会使删除任何重新分配节流的影响。现在,如果您保存了将分配恢复到其原始代理的 JSON,您可以删除恢复文件。
10.2.5. 缩减 Kafka 集群 复制链接链接已复制到粘贴板!
这个步骤描述了如何减少 Kafka 集群中的代理数量。
先决条件
- 现有 Kafka 集群。
- 在代理被删除后,重新分配分区的 JSON 文件 如何将分区重新分配给集群中的代理。
流程
使用
kafka-reassign-partitions.sh命令行工具运行分区重新分配。/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server <bootstrap_address> --reassignment-json-file <reassignment_json_file> --execute
/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server <bootstrap_address> --reassignment-json-file <reassignment_json_file> --executeCopy to Clipboard Copied! Toggle word wrap Toggle overflow 如果要节流复制,您也可以传递
--throttle选项,并用每秒字节数(以字节/秒为单位)传递 --throttle 选项。例如:/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassignment.json --throttle 5000000 --execute
/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassignment.json --throttle 5000000 --executeCopy to Clipboard Copied! Toggle word wrap Toggle overflow 此命令将打印出两个重新分配 JSON 对象。第一个记录移动分区的当前分配。如果稍后需要恢复重新分配,您应将它保存到文件中。第二个 JSON 对象是您在重新分配 JSON 文件中传递的目标重新分配。
如果您需要在重新分配过程中更改节流,您可以使用带有不同节流率相同的命令行。例如:
/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassignment.json --throttle 10000000 --execute
/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassignment.json --throttle 10000000 --executeCopy to Clipboard Copied! Toggle word wrap Toggle overflow 使用
kafka-reassign-partitions.sh命令行工具定期验证重新分配是否完成。这与上一步中的命令相同,但使用--verify选项而不是--execute选项。/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server <bootstrap_address> --reassignment-json-file <reassignment_json_file> --verify
/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server <bootstrap_address> --reassignment-json-file <reassignment_json_file> --verifyCopy to Clipboard Copied! Toggle word wrap Toggle overflow 例如:
/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassignment.json --verify
/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassignment.json --verifyCopy to Clipboard Copied! Toggle word wrap Toggle overflow -
当
--verify命令报告每个分区被成功完成时,重新分配已完成。最后,--verify也会使删除任何重新分配节流的影响。现在,如果您保存了将分配恢复到其原始代理的 JSON,您可以删除恢复文件。 检查正在删除的每个代理都在其日志中没有任何实时分区(
log.dirs)。ls -l <LogDir> | grep -E '^d' | grep -vE '[a-zA-Z0-9.-]+\.[a-z0-9]+-delete$'
ls -l <LogDir> | grep -E '^d' | grep -vE '[a-zA-Z0-9.-]+\.[a-z0-9]+-delete$'Copy to Clipboard Copied! Toggle word wrap Toggle overflow 如果日志目录与正则表达式
\.[a-z0-9]-delete$匹配,则活动分区仍然存在。如果您有活跃的分区,检查重新分配已完成,或者在重新分配 JSON 文件中的配置。您可以再次运行重新分配。确保进入下一步之前没有活动分区。停止代理。
su - kafka /opt/kafka/bin/kafka-server-stop.sh
su - kafka /opt/kafka/bin/kafka-server-stop.shCopy to Clipboard Copied! Toggle word wrap Toggle overflow 确认 Kafka 代理已停止。
jcmd | grep kafka
jcmd | grep kafkaCopy to Clipboard Copied! Toggle word wrap Toggle overflow
10.2.6. 扩展 ZooKeeper 集群 复制链接链接已复制到粘贴板!
这个步骤描述了如何将服务器(节点)添加到 ZooKeeper 集群。ZooKeeper 的动态重新配置 功能在扩展过程中维护稳定的 ZooKeeper 集群。
先决条件
-
在 ZooKeeper 配置文件中启用动态重新配置(
reconfigEnabled=true)。 - 启用 zookeeper 身份验证,您可以使用身份验证机制访问新服务器。
流程
对每个 ZooKeeper 服务器执行以下步骤,一次:
- 在 ZooKeeper 集群中添加服务器,如 第 3.1 节 “运行多节点 ZooKeeper 集群” 所述,然后启动 ZooKeeper。
- 请注意,IP 地址并配置了新服务器的访问端口。
为服务器启动
zookeeper-shell会话。从可访问集群的机器上运行以下命令(这可能是其中一个 ZooKeeper 节点或本地机器(如果可以访问)。su - kafka /opt/kafka/bin/zookeeper-shell.sh <ip-address>:<zk-port>
su - kafka /opt/kafka/bin/zookeeper-shell.sh <ip-address>:<zk-port>Copy to Clipboard Copied! Toggle word wrap Toggle overflow 在 shell 会话中,运行 ZooKeeper 节点,输入以下命令将新服务器作为投票成员添加到仲裁中:
reconfig -add server.<positive-id> = <address1>:<port1>:<port2>[:role];[<client-port-address>:]<client-port>
reconfig -add server.<positive-id> = <address1>:<port1>:<port2>[:role];[<client-port-address>:]<client-port>Copy to Clipboard Copied! Toggle word wrap Toggle overflow 例如:
reconfig -add server.4=172.17.0.4:2888:3888:participant;172.17.0.4:2181
reconfig -add server.4=172.17.0.4:2888:3888:participant;172.17.0.4:2181Copy to Clipboard Copied! Toggle word wrap Toggle overflow 其中
<positive-id>是新服务器 ID4。对于两个端口,
<port1>2888 用于 ZooKeeper 服务器之间的通信,<port2>3888 用于领导选举机制。新的配置传播到 ZooKeeper 集群中的其他服务器 ; 新服务器现在是仲裁的完整成员。
- 对您要添加的其他服务器重复第 1-4 步。
10.2.7. 缩减 ZooKeeper 集群 复制链接链接已复制到粘贴板!
这个步骤描述了如何从 ZooKeeper 集群中删除服务器(节点)。ZooKeeper 的动态重新配置 功能在缩减过程中维护稳定的 ZooKeeper 集群。
先决条件
-
在 ZooKeeper 配置文件中启用动态重新配置(
reconfigEnabled=true)。 - 启用 zookeeper 身份验证,您可以使用身份验证机制访问新服务器。
流程
对每个 ZooKeeper 服务器执行以下步骤,一次:
登录到在缩减后会保留的其中一个服务器上的
zookeeper-shell(例如,服务器 1)。注意使用为 ZooKeeper 集群配置的验证机制访问服务器。
删除服务器,如 server 5。
reconfig -remove 5
reconfig -remove 5Copy to Clipboard Copied! Toggle word wrap Toggle overflow - 取消激活您删除的服务器。
- 重复步骤 1-3 以减小集群大小。