第 4 章 运行基于培训 Operator 的分布式培训工作负载
要减少培训大型语言模型(LLM)所需的时间,您可以并行运行培训工作。在 Red Hat OpenShift AI 中,Kubeflow Training Operator 和 Kubeflow Training Operator Python Software Development Kit (training Operator SDK)简化了作业配置。
您可以使用 Training Operator 和 Training Operator SDK 以各种方式配置培训作业。例如,您可以每个节点使用多个 GPU,微调模型,或者将培训作业配置为使用远程直接内存访问(RDMA)。
4.1. 使用 Kubeflow Training Operator 运行分布式培训工作负载 复制链接链接已复制到粘贴板!
您可以使用 Training Operator PyTorchJob API 配置 PyTorchJob 资源,以便培训作业在多个带有多个 GPU 的节点上运行。
您可以将 training 脚本存储在 ConfigMap 资源中,或者将其包含在自定义容器镜像中。
您可以创建一个 ConfigMap 资源来存储 Training Operator PyTorch training 脚本。
或者,您可以使用 示例 Dockerfile 将培训脚本包含在自定义容器镜像中,如 创建自定义培训镜像 中所述。
先决条件
- 您的集群管理员已安装了带有所需分布式培训组件的 Red Hat OpenShift AI,如 安装分布式工作负载组件 中所述。
- 您可以访问安装 OpenShift AI 的集群的 OpenShift 控制台。
流程
- 登录 OpenShift 控制台。
创建
ConfigMap资源,如下所示:-
在 Administrator 视角中,点 Workloads
ConfigMaps。 - 从 Project 列表中,选择您的项目。
- 点 Create ConfigMap。
在 Configure via 部分中,选择 YAML view 选项。
Create ConfigMap 页面将打开,默认的 YAML 代码会自动添加。
-
在 Administrator 视角中,点 Workloads
将默认 YAML 代码替换为您的 training-script 代码。
有关培训脚本示例,请参阅 培训 Operator PyTorch 培训脚本示例。
- 点 Create。
验证
-
在 OpenShift 控制台中,在 Administrator 视角中点 Workloads
ConfigMaps。 - 从 Project 列表中,选择您的项目。
- 点 ConfigMap 资源显示培训脚本详情。
4.1.2. 创建 Training Operator PyTorchJob 资源 复制链接链接已复制到粘贴板!
您可以创建一个 PyTorchJob 资源来运行 Training Operator PyTorch training 脚本。
先决条件
- 您可以使用支持的 NVIDIA GPU 或 AMD GPU 访问具有多个 worker 节点的 OpenShift 集群。
集群管理员配置了集群,如下所示:
- 安装了带有所需分布式培训组件的 Red Hat OpenShift AI,如 安装分布式工作负载组件 中所述。
- 配置分布式培训资源,如 管理分布式工作负载 中所述。
- 您可以访问适合分布式培训的工作台,如 为分布式培训 创建工作台 中所述。
具有数据科学项目的管理员访问权限。
- 如果创建项目,则会自动具有管理员访问权限。
- 如果没有创建项目,您的集群管理员必须授予管理员访问权限。
流程
- 登录 OpenShift 控制台。
创建
PyTorchJob资源,如下所示:-
在 Administrator 视角中,点击 Home
Search。 - 从 Project 列表中,选择您的项目。
-
单击 Resources 列表,然后在搜索字段中开始输入
PyTorchJob。 选择 PyTorchJob,然后单击 Create PyTorchJob。
Create PyTorchJob 页面将打开,自动添加默认 YAML 代码。
-
在 Administrator 视角中,点击 Home
更新元数据,将
name和namespace值替换为您的环境的值,如下例所示:metadata: name: pytorch-multi-node-job namespace: test-namespace配置 master 节点,如下例所示:
spec: pytorchReplicaSpecs: Master: replicas: 1 restartPolicy: OnFailure template: metadata: labels: app: pytorch-multi-node-job-
在
replicas条目中,指定1。只需要一个 master 节点。 要使用 ConfigMap 资源为 PyTorchJob pod 提供培训脚本,请添加 ConfigMap 卷挂载信息,如下例所示:
从 ConfigMap 资源添加培训脚本
Spec: pytorchReplicaSpecs: Master: ... template: spec: containers: - name: pytorch image: quay.io/modh/training:py311-cuda124-torch251 command: ["python", "/workspace/scripts/train.py"] volumeMounts: - name: training-script-volume mountPath: /workspace volumes: - name: training-script-volume configMap: name: training-script-configmap为您的环境添加适当的资源限制,如下例所示:
添加资源 contraints
SSpec: pytorchReplicaSpecs: Master: ... template: spec: containers: ... resources: requests: cpu: "4" memory: "8Gi" nvidia.com/gpu: 2 # To use GPUs (Optional) limits: cpu: "4" memory: "8Gi" nvidia.com/gpu: 2
-
在
在
PyTorchJob资源的Worker部分中进行类似的编辑。-
更新
replicas条目,以指定 worker 节点的数量。
有关
PyTorchJob资源的完整示例,请参阅 multi-node 培训的 Training Operator PyTorchJob 资源示例。-
更新
- 点 Create。
验证
- 在 OpenShift 控制台中,打开 Administrator 视角。
- 从 Project 列表中,选择您的项目。
-
点 Home
Search PyTorchJob,并验证作业是否已创建。 -
点 Workloads
Pods,验证请求的 head pod 和 worker pod 是否正在运行。
4.1.3. 使用 CLI 创建 Training Operator PyTorchJob 资源 复制链接链接已复制到粘贴板!
您可以使用 OpenShift 命令行界面(CLI)创建 PyTorchJob 资源,以运行 Training Operator PyTorch training 脚本。
先决条件
- 您可以使用支持的 NVIDIA GPU 或 AMD GPU 访问具有多个 worker 节点的 OpenShift 集群。
集群管理员配置了集群,如下所示:
- 安装了带有所需分布式培训组件的 Red Hat OpenShift AI,如 安装分布式工作负载组件 中所述。
- 配置分布式培训资源,如 管理分布式工作负载 中所述。
- 您可以访问适合分布式培训的工作台,如 为分布式培训 创建工作台 中所述。
具有数据科学项目的管理员访问权限。
- 如果创建项目,则会自动具有管理员访问权限。
- 如果没有创建项目,您的集群管理员必须授予管理员访问权限。
- 您已下载并安装 OpenShift 命令行界面(CLI),如 安装 OpenShift CLI (OpenShift Dedicated)或 安装 OpenShift CLI (Red Hat OpenShift Service on AWS) 所述。
流程
登录到 OpenShift CLI,如下所示:
登录到 OpenShift CLI
oc login --token=<token> --server=<server>有关如何查找服务器和令牌详情的详情,请参考使用集群服务器和令牌进行身份验证。
创建名为 training
.py的文件,并使用您的培训脚本填充该文件,如下所示:创建培训脚本
cat <<EOF > train.py <paste your content here> EOF将 <paste your content here& gt; 替换为您的培训脚本内容。
有关培训脚本示例,请参阅 培训 Operator PyTorch 培训脚本示例。
创建
ConfigMap资源来存储培训脚本,如下所示:创建 ConfigMap 资源
oc create configmap training-script-configmap --from-file=train.py -n <your-namespace>将 <your-namespace > 替换为项目的名称。
创建名为
pytorchjob.yaml的文件来定义分布式培训作业设置,如下所示:定义分布式培训工作
cat <<EOF > pytorchjob.py <paste your content here> EOF将 <paste your content here > 替换为您的培训工作内容。
有关示例培训作业,请参阅多节点培训培训 Operator PyTorchJob 资源示例。
创建分布式培训工作,如下所示:
创建分布式培训工作
oc apply -f pytorchjob.yaml
验证
监控正在运行的分布式培训作业,如下所示:
监控分布式培训工作
oc get pytorchjobs -n <your-namespace>将 <your-namespace > 替换为项目的名称。
检查 pod 日志,如下所示:
检查 pod 日志
oc logs <pod-name> -n <your-namespace>将 <your-namespace > 替换为项目的名称。
当您要删除作业时,运行以下命令:
删除作业
oc delete pytorchjobs/pytorch-multi-node-job -n <your-namespace>将 <your-namespace > 替换为项目的名称。
4.1.4. 培训 Operator PyTorch 培训脚本示例 复制链接链接已复制到粘贴板!
以下示例演示了如何为 NVIDIA Collective Communications Library (NCCL)、分布式数据并行(DDP)和 Fully Sharded Data Parallel (FSDP)培训工作配置 PyTorch 培训脚本。
如果您有所需资源,可以在不编辑的情况下运行示例代码。
或者,您可以修改示例代码,以指定适合您的培训工作配置。
4.1.4.1. 培训 Operator PyTorch 培训脚本示例:NCCL 复制链接链接已复制到粘贴板!
此 NVIDIA Collective Communications Library (NCCL)示例返回每个加速器的 rank 和 10sor 值。
import os
import torch
import torch.distributed as dist
def main():
# Select backend dynamically: nccl for GPU, gloo for CPU
backend = "nccl" if torch.cuda.is_available() else "gloo"
# Initialize the process group
dist.init_process_group(backend)
# Get rank and world size
rank = dist.get_rank()
world_size = dist.get_world_size()
# Select device dynamically
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Running on rank {rank} out of {world_size} using {device} with backend {backend}.")
# Initialize tensor on the selected device
tensor = torch.zeros(1, device=device)
if rank == 0:
tensor += 1
for i in range(1, world_size):
dist.send(tensor, dst=i)
else:
dist.recv(tensor, src=0)
print(f"Rank {rank}: Tensor value {tensor.item()} on {device}")
if name == "main":
main()
backend 值会自动设置为以下值之一:
-
nccl:将 NVIDIA Collective Communications Library (NCCL)用于 NVIDIA GPU 或 ROCm communications Collectives Library (RCCL)用于 AMD GPU -
gloo: 对 CPU 使用 Gloo
为 NVIDIA GPU 和 AMD GPU 指定 backend="nccl"。
对于 AMD GPU,即使 backend 值设置为 nccl,ROCm 环境使用 RCCL 进行通信。
4.1.4.2. 培训 Operator PyTorch 培训脚本示例:DDP 复制链接链接已复制到粘贴板!
本例演示了如何为分布式数据并行(DDP)培训作业配置培训脚本。
import os
import sys
import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch import nn, optim
# Enable verbose logging
os.environ["TORCH_DISTRIBUTED_DEBUG"] = "INFO"
def setup_ddp():
"""Initialize the distributed process group dynamically."""
backend = "nccl" if torch.cuda.is_available() else "gloo"
dist.init_process_group(backend=backend)
local_rank = int(os.environ["LOCAL_RANK"])
world_size = dist.get_world_size()
# Ensure correct device is set
device = torch.device(f"cuda:{local_rank}" if torch.cuda.is_available() else "cpu")
torch.cuda.set_device(local_rank) if torch.cuda.is_available() else None
print(f"[Rank {local_rank}] Initialized with backend={backend}, world_size={world_size}")
sys.stdout.flush() # Ensure logs are visible in Kubernetes
return local_rank, world_size, device
def cleanup():
"""Clean up the distributed process group."""
dist.destroy_process_group()
class SimpleModel(nn.Module):
"""A simple model with multiple layers."""
def init(self):
super(SimpleModel, self).init()
self.layer1 = nn.Linear(1024, 512)
self.layer2 = nn.Linear(512, 256)
self.layer3 = nn.Linear(256, 128)
self.layer4 = nn.Linear(128, 64)
self.output = nn.Linear(64, 1)
def forward(self, x):
x = torch.relu(self.layer1(x))
x = torch.relu(self.layer2(x))
x = torch.relu(self.layer3(x))
x = torch.relu(self.layer4(x))
return self.output(x)
def log_ddp_parameters(model, rank):
"""Log model parameter count for DDP."""
num_params = sum(p.numel() for p in model.parameters())
print(f"[Rank {rank}] Model has {num_params} parameters (replicated across all ranks)")
sys.stdout.flush()
def log_memory_usage(rank):
"""Log GPU memory usage if CUDA is available."""
if torch.cuda.is_available():
torch.cuda.synchronize()
print(f"[Rank {rank}] GPU Memory Allocated: {torch.cuda.memory_allocated() / 1e6} MB")
print(f"[Rank {rank}] GPU Memory Reserved: {torch.cuda.memory_reserved() / 1e6} MB")
sys.stdout.flush()
def main():
local_rank, world_size, device = setup_ddp()
# Initialize model and wrap with DDP
model = SimpleModel().to(device)
model = DDP(model, device_ids=[local_rank] if torch.cuda.is_available() else None)
print(f"[Rank {local_rank}] DDP Initialized")
log_ddp_parameters(model, local_rank)
log_memory_usage(local_rank)
# Optimizer and criterion
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.MSELoss()
# Dummy dataset (adjust for real-world use case)
x = torch.randn(32, 1024).to(device)
y = torch.randn(32, 1).to(device)
# Training loop
for epoch in range(5):
model.train()
optimizer.zero_grad()
# Forward pass
outputs = model(x)
loss = criterion(outputs, y)
# Backward pass
loss.backward()
optimizer.step()
print(f"[Rank {local_rank}] Epoch {epoch}, Loss: {loss.item()}")
log_memory_usage(local_rank) # Track memory usage
sys.stdout.flush() # Ensure logs appear in real-time
cleanup()
if name == "main":
main()
4.1.4.3. 培训 Operator PyTorch 培训脚本示例:FSDP 复制链接链接已复制到粘贴板!
本例演示了如何为 Fully Sharded Data Parallel (FSDP)培训工作配置培训脚本。
import os
import sys
import torch
import torch.distributed as dist
from torch.distributed.fsdp import FullyShardedDataParallel as FSDP, CPUOffload
from torch.distributed.fsdp.wrap import always_wrap_policy
from torch import nn, optim
# Enable verbose logging for debugging
os.environ["TORCH_DISTRIBUTED_DEBUG"] = "INFO" # Enables detailed FSDP logs
def setup_ddp():
"""Initialize the distributed process group dynamically."""
backend = "nccl" if torch.cuda.is_available() else "gloo"
dist.init_process_group(backend=backend)
local_rank = int(os.environ["LOCAL_RANK"])
world_size = dist.get_world_size()
# Ensure the correct device is set
device = torch.device(f"cuda:{local_rank}" if torch.cuda.is_available() else "cpu")
torch.cuda.set_device(local_rank) if torch.cuda.is_available() else None
print(f"[Rank {local_rank}] Initialized with backend={backend}, world_size={world_size}")
sys.stdout.flush() # Ensure logs are visible in Kubernetes
return local_rank, world_size, device
def cleanup():
"""Clean up the distributed process group."""
dist.destroy_process_group()
class SimpleModel(nn.Module):
"""A simple model with multiple layers."""
def init(self):
super(SimpleModel, self).init()
self.layer1 = nn.Linear(1024, 512)
self.layer2 = nn.Linear(512, 256)
self.layer3 = nn.Linear(256, 128)
self.layer4 = nn.Linear(128, 64)
self.output = nn.Linear(64, 1)
def forward(self, x):
x = torch.relu(self.layer1(x))
x = torch.relu(self.layer2(x))
x = torch.relu(self.layer3(x))
x = torch.relu(self.layer4(x))
return self.output(x)
def log_fsdp_parameters(model, rank):
"""Log FSDP parameters and sharding strategy."""
num_params = sum(p.numel() for p in model.parameters())
print(f"[Rank {rank}] Model has {num_params} parameters (sharded across {dist.get_world_size()} workers)")
sys.stdout.flush()
def log_memory_usage(rank):
"""Log GPU memory usage if CUDA is available."""
if torch.cuda.is_available():
torch.cuda.synchronize()
print(f"[Rank {rank}] GPU Memory Allocated: {torch.cuda.memory_allocated() / 1e6} MB")
print(f"[Rank {rank}] GPU Memory Reserved: {torch.cuda.memory_reserved() / 1e6} MB")
sys.stdout.flush()
def main():
local_rank, world_size, device = setup_ddp()
# Initialize model and wrap with FSDP
model = SimpleModel().to(device)
model = FSDP(
model,
cpu_offload=CPUOffload(offload_params=not torch.cuda.is_available()), # Offload if no GPU
auto_wrap_policy=always_wrap_policy, # Wrap all layers automatically
)
print(f"[Rank {local_rank}] FSDP Initialized")
log_fsdp_parameters(model, local_rank)
log_memory_usage(local_rank)
# Optimizer and criterion
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.MSELoss()
# Dummy dataset (adjust for real-world use case)
x = torch.randn(32, 1024).to(device)
y = torch.randn(32, 1).to(device)
# Training loop
for epoch in range(5):
model.train()
optimizer.zero_grad()
# Forward pass
outputs = model(x)
loss = criterion(outputs, y)
# Backward pass
loss.backward()
optimizer.step()
print(f"[Rank {local_rank}] Epoch {epoch}, Loss: {loss.item()}")
log_memory_usage(local_rank) # Track memory usage
sys.stdout.flush() # Ensure logs appear in real-time
cleanup()
if name == "main":
main()
4.1.5. 培训 Operator PyTorch 培训脚本的 Dockerfile 示例 复制链接链接已复制到粘贴板!
您可以使用此示例 Dockerfile 将培训脚本包含在自定义培训镜像中。
FROM quay.io/modh/training:py311-cuda124-torch251
WORKDIR /workspace
COPY train.py /workspace/train.py
CMD ["python", "train.py"]
这个示例将培训脚本复制到默认的 PyTorch 镜像,并运行 脚本。
有关如何使用此 Dockerfile 在自定义容器镜像中包含培训脚本的更多信息,请参阅 创建自定义培训镜像。
4.1.6. 用于多节点培训的 Training Operator PyTorchJob 资源示例 复制链接链接已复制到粘贴板!
本例演示了如何创建在具有多个 GPU 的多个节点上运行的培训 Operator PyTorch 培训作业。
apiVersion: kubeflow.org/v1
kind: PyTorchJob
metadata:
name: pytorch-multi-node-job
namespace: test-namespace
spec:
pytorchReplicaSpecs:
Master:
replicas: 1
restartPolicy: OnFailure
template:
metadata:
labels:
app: pytorch-multi-node-job
spec:
containers:
- name: pytorch
image: quay.io/modh/training:py311-cuda124-torch251
imagePullPolicy: IfNotPresent
command: ["torchrun", "/workspace/train.py"]
volumeMounts:
- name: training-script-volume
mountPath: /workspace
resources:
requests:
cpu: "4"
memory: "8Gi"
nvidia.com/gpu: "2"
limits:
cpu: "4"
memory: "8Gi"
nvidia.com/gpu: "2"
volumes:
- name: training-script-volume
configMap:
name: training-script-configmap
Worker:
replicas: 1
restartPolicy: OnFailure
template:
metadata:
labels:
app: pytorch-multi-node-job
spec:
containers:
- name: pytorch
image: quay.io/modh/training:py311-cuda124-torch251
imagePullPolicy: IfNotPresent
command: ["torchrun", "/workspace/train.py"]
volumeMounts:
- name: training-script-volume
mountPath: /workspace
resources:
requests:
cpu: "4"
memory: "8Gi"
nvidia.com/gpu: "2"
limits:
cpu: "4"
memory: "8Gi"
nvidia.com/gpu: "2"
volumes:
- name: training-script-volume
configMap:
name: training-script-configmap