12.2. 扩展 Kafka 集群
您可以在 Kafka 集群中添加或删除代理。您还可以从 ZooKeeper 集群中添加或删除节点。
当您添加或删除代理时,您可以使用 kafka-reassign-partitions.sh 来分配分区。
您还可以使用 Cruise Control 在重新平衡 Kafka 集群时包含对代理数量的更改。您可以安装新代理,并将它们包含在重新平衡中。在移除代理前,可以执行将代理从重新平衡中排除的重新平衡。更多信息请参阅 第 14 章 使用 Cruise Control 进行集群重新平衡。
12.2.1. 在 Kafka 集群中添加和删除代理 复制链接链接已复制到粘贴板!
增加主题吞吐量的主要方法是增加该主题的分区数。分区允许在集群中的代理之间共享该主题的负载。当某个资源(通常是 I/O)的限制代理时,使用更多分区将无法增加吞吐量。反之,您必须在集群中添加代理。
当您在集群中添加额外代理时,AMQ Streams 不会自动为其分配任何分区。您必须决定将哪些分区从现有代理移到新代理中。在所有代理间重新分发分区后,每个代理应具有较低的资源利用率。
在从集群中删除代理前,您必须确保没有将其分配给任何分区。您应该决定将哪些其余代理负责代理上的每个分区。代理没有分配的分区后,您可以停止它。
12.2.2. 重新分配分区 复制链接链接已复制到粘贴板!
kafka-reassign-partitions.sh 工具用于将分区重新分配给不同的代理。
它有三种不同的模式:
--generate- 取一组主题和代理,并生成 重新分配 JSON 文件,这将使这些主题的分区分配给这些代理。生成 重新分配 JSON 文件 的一种简单方法,但它可在整个主题上运行,因此其使用并不总是合适。
--execute- 取 重新分配 JSON 文件,并将其应用到集群中的分区和代理。获得分区的代理将遵循分区领导机。对于给定分区,新代理发现并加入 ISR 后,旧代理会停止成为后续代理,并删除其副本。
--verify-
使用与
--execute步骤相同的重新分配 JSON 文件,--verify会检查文件中所有分区是否已移至其预期的代理中。如果重新分配已完成,它也会删除生效的任何 节流。除非被删除,否则在重新分配完成后也会继续影响集群。
在任何给定时间,集群中只能有一个重新分配功能,且无法取消正在运行的重新分配。如果您需要取消重新分配,您必须等待它完成,然后执行另一个重新分配来恢复第一个的影响。kafka-reassign-partitions.sh 将打印这个 reversion 的重新分配 JSON 作为其输出的一部分。非常大的重新分配应该被分解为多个较小的重新分配,以防需要停止 in-progress 重新分配。
12.2.2.1. 重新分配 JSON 文件 复制链接链接已复制到粘贴板!
重新分配 JSON 文件具有特定的结构:
{
"version": 1,
"partitions": [
<PartitionObjects>
]
}
其中 <PartitionObjects > 是以逗号分隔的对象列表,如下所示:
{
"topic": <TopicName>,
"partition": <Partition>,
"replicas": [ <AssignedBrokerIds> ],
"log_dirs": [<LogDirs>]
}
"log_dirs" 属性是可选的,用于将分区移到特定的日志目录中。
以下是一个示例重新分配 JSON 文件,它将主题 topic-a, 分区 4 分配给代理 2, 4 和 7;主题 topic-b 的分区 2 分配给代理 1, 5 和 7:
{
"version": 1,
"partitions": [
{
"topic": "topic-a",
"partition": 4,
"replicas": [2,4,7]
},
{
"topic": "topic-b",
"partition": 2,
"replicas": [1,5,7]
}
]
}
没有包括在 JSON 中的分区不会改变。
12.2.2.2. 生成重新分配 JSON 文件 复制链接链接已复制到粘贴板!
为给定一组主题分配所有分区的最简单方法是,使用 kafka-reassign-partitions.sh --generate 命令生成重新分配 JSON 文件。
/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server <bootstrap_address> --topics-to-move-json-file <topics_file> --broker-list <broker_list> --generate
& lt;topics_file > 是一个 JSON 文件,列出了要移动的主题。它具有以下结构:
{
"version": 1,
"topics": [
<topic_objects>
]
}
其中 <topic_objects > 是一个以逗号分隔的对象列表,如下所示:
{
"topic": <TopicName>
}
例如,要将 topic-a 和 topic-b 的所有分区移动到代理 4 和 7
/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --topics-to-move-json-file topics-to-be-moved.json --broker-list 4,7 --generate
where topics-to-be-moved.json has contents:
{
"version": 1,
"topics": [
{ "topic": "topic-a"},
{ "topic": "topic-b"}
]
}
12.2.2.3. 手动创建重新分配 JSON 文件 复制链接链接已复制到粘贴板!
如果要移动特定分区,您可以手动创建重新分配 JSON 文件。
12.2.3. 重新分配节流 复制链接链接已复制到粘贴板!
重新分配分区可能是一个较慢的过程,因为它需要在代理间移动大量数据。为了避免这会对客户端产生不利影响,可以限制重新分配。使用节流可能意味着重新分配需要更长的时间。如果节流过低,则新分配的代理将无法与发布记录保持同步,且重新分配永远不会完成。如果节流太高,客户端将会受到影响。例如,对于制作者,这所可能的清单高于等待确认的正常延迟。对于消费者,这可能会在轮询之间延迟更高导致的吞吐量下降。
12.2.4. 扩展 Kafka 集群 复制链接链接已复制到粘贴板!
这个步骤描述了如何增加 Kafka 集群中的代理数量。
先决条件
- 现有的 Kafka 集群。
- 安装了 AMQ 代理的新机器。
- 重新分配 JSON 文件,如何将分区重新分配给放大集群中的代理。
流程
-
使用与集群中其他代理相同的设置为新代理创建配置文件,但
broker.id除外,它应该是未被任何其他代理使用的编号。 启动新的 Kafka 代理,将您在上一步中创建的配置文件作为
kafka-server-start.sh脚本的参数传递:su - kafka /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties验证 Kafka 代理是否正在运行。
jcmd | grep Kafka- 为每个新代理重复上述步骤。
使用
kafka-reassign-partitions.sh命令行工具运行分区重新分配。/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server <bootstrap_address> --reassignment-json-file <reassignment_json_file> --execute如果要节流复制,您还可以通过
--throttle选项,并带有 inter-broker throttled rate (以字节/秒为单位)。例如:/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassignment.json --throttle 5000000 --execute此命令将输出两个重新分配 JSON 对象。第一个记录了正在移动的分区的当前分配。您应该在稍后恢复重新分配时将其保存到文件中。第二个 JSON 对象是您在重新分配 JSON 文件中传递的目标重新分配。
如果您需要在重新分配过程中更改节流,您可以使用带有不同节流率相同的命令行。例如:
/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassignment.json --throttle 10000000 --execute使用
kafka-reassign-partitions.sh命令行工具定期验证重新分配是否已完成。这个命令与上一步中的命令相同,但使用--verify选项而不是--execute选项。/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server <bootstrap_address> --reassignment-json-file <reassignment_json_file> --verify例如:
/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassignment.json --verify-
当
--verify命令将每个分区报告为成功完成时,重新分配已完成。最后的--verify也会使删除任何重新分配节流的影响。现在,如果您保存了 JSON 以将分配恢复到其原始代理,可以删除恢复的文件。
12.2.5. 缩减 Kafka 集群 复制链接链接已复制到粘贴板!
此流程描述了如何减少 Kafka 集群中的代理数量。
先决条件
- 现有的 Kafka 集群。
- 删除代理后,如何将分区重新分配给集群中代理的 重新分配 JSON 文件。
流程
使用
kafka-reassign-partitions.sh命令行工具运行分区重新分配。/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server <bootstrap_address> --reassignment-json-file <reassignment_json_file> --execute如果要节流复制,您还可以通过
--throttle选项,并带有 inter-broker throttled rate (以字节/秒为单位)。例如:/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassignment.json --throttle 5000000 --execute此命令将输出两个重新分配 JSON 对象。第一个记录了正在移动的分区的当前分配。您应该在稍后恢复重新分配时将其保存到文件中。第二个 JSON 对象是您在重新分配 JSON 文件中传递的目标重新分配。
如果您需要在重新分配过程中更改节流,您可以使用带有不同节流率相同的命令行。例如:
/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassignment.json --throttle 10000000 --execute使用
kafka-reassign-partitions.sh命令行工具定期验证重新分配是否已完成。这个命令与上一步中的命令相同,但使用--verify选项而不是--execute选项。/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server <bootstrap_address> --reassignment-json-file <reassignment_json_file> --verify例如:
/opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassignment.json --verify-
当
--verify命令将每个分区报告为成功完成时,重新分配已完成。最后的--verify也会使删除任何重新分配节流的影响。现在,如果您保存了 JSON 以将分配恢复到其原始代理,可以删除恢复的文件。 检查正在删除的每个代理是否在其日志中没有任何实时分区(
log.dirs)。ls -l <LogDir> | grep -E '^d' | grep -vE '[a-zA-Z0-9.-]+\.[a-z0-9]+-delete$'如果日志目录与正则表达式
\ 不匹配。[a-z0-9]-delete$,则活跃分区仍然存在。如果您有活跃的分区,检查重新分配已完成,或者在重新分配 JSON 文件中的配置。您可以再次运行重新分配。在继续下一步之前,请确保没有活动的分区。停止代理。
su - kafka /opt/kafka/bin/kafka-server-stop.sh确认 Kafka 代理已经停止。
jcmd | grep kafka
12.2.6. 扩展 ZooKeeper 集群 复制链接链接已复制到粘贴板!
这个步骤描述了如何在 ZooKeeper 集群中添加服务器(节点)。ZooKeeper 的动态重新配置 功能在扩展过程中维护稳定的 ZooKeeper 集群。
先决条件
-
ZooKeeper 配置文件中启用了动态重新配置(
reconfigEnabled=true)。 - ZooKeeper 身份验证已启用,您可以使用身份验证机制访问新服务器。
流程
为每个 ZooKeeper 服务器执行以下步骤:
- 在 ZooKeeper 集群中添加服务器,如 第 4.1 节 “运行多节点 ZooKeeper 集群” 所述,然后启动 ZooKeeper。
- 请注意新服务器的 IP 地址和已配置的访问端口。
为服务器启动
zookeeper-shell会话。从可访问集群的机器中运行以下命令(如果已经访问,这可能是 ZooKeeper 节点或本地机器之一)。su - kafka /opt/kafka/bin/zookeeper-shell.sh <ip-address>:<zk-port>在 shell 会话中,运行 ZooKeeper 节点时,输入以下行来将新服务器作为投票成员添加到仲裁中:
reconfig -add server.<positive-id> = <address1>:<port1>:<port2>[:role];[<client-port-address>:]<client-port>例如:
reconfig -add server.4=172.17.0.4:2888:3888:participant;172.17.0.4:2181其中
<positive-id>是新服务器 ID4。对于两个端口,
<port1>2888 用于 ZooKeeper 服务器之间的通信,<port2>3888 用于领导选举机制。新配置传播到 ZooKeeper 集群中的其他服务器;新服务器现在是仲裁的完整成员。
- 对要添加的其他服务器重复步骤 1-4。
12.2.7. 缩减 ZooKeeper 集群 复制链接链接已复制到粘贴板!
这个步骤描述了如何从 ZooKeeper 集群中删除服务器(节点)。ZooKeeper 的动态重新配置 功能在缩减过程中维护稳定的 ZooKeeper 集群。
先决条件
-
ZooKeeper 配置文件中启用了动态重新配置(
reconfigEnabled=true)。 - ZooKeeper 身份验证已启用,您可以使用身份验证机制访问新服务器。
流程
为每个 ZooKeeper 服务器执行以下步骤:
登录到在缩减后会保留的其中一个服务器上的
zookeeper-shell(例如,服务器 1)。注意使用为 ZooKeeper 集群配置的验证机制访问服务器。
删除服务器,如服务器 5。
reconfig -remove 5- 取消激活您删除的服务器。
- 重复步骤 1-3 以减小集群大小。