第 4 章 运行基于培训 Operator 的分布式培训工作负载


要减少培训大型语言模型(LLM)所需的时间,您可以并行运行培训工作。在 Red Hat OpenShift AI 中,Kubeflow Training Operator 和 Kubeflow Training Operator Python Software Development Kit (training Operator SDK)简化了作业配置。

您可以使用 Training Operator 和 Training Operator SDK 以各种方式配置培训作业。例如,您可以每个节点使用多个 GPU,微调模型,或者将培训作业配置为使用远程直接内存访问(RDMA)。

您可以使用 Training Operator PyTorchJob API 配置 PyTorchJob 资源,以便培训作业在多个带有多个 GPU 的节点上运行。

您可以将 training 脚本存储在 ConfigMap 资源中,或者将其包含在自定义容器镜像中。

您可以创建一个 ConfigMap 资源来存储 Training Operator PyTorch training 脚本。

注意

或者,您可以使用 示例 Dockerfile 将培训脚本包含在自定义容器镜像中,如 创建自定义培训镜像 中所述。

先决条件

流程

  1. 登录 OpenShift 控制台。
  2. 创建 ConfigMap 资源,如下所示:

    1. Administrator 视角中,点 Workloads ConfigMaps
    2. Project 列表中,选择您的项目。
    3. Create ConfigMap
    4. Configure via 部分中,选择 YAML view 选项。

      Create ConfigMap 页面将打开,默认的 YAML 代码会自动添加。

  3. 将默认 YAML 代码替换为您的 training-script 代码。

    有关培训脚本示例,请参阅 培训 Operator PyTorch 培训脚本示例

  4. Create

验证

  1. 在 OpenShift 控制台中,在 Administrator 视角中点 Workloads ConfigMaps
  2. Project 列表中,选择您的项目。
  3. 点 ConfigMap 资源显示培训脚本详情。

4.1.2. 创建 Training Operator PyTorchJob 资源

您可以创建一个 PyTorchJob 资源来运行 Training Operator PyTorch training 脚本。

先决条件

流程

  1. 登录 OpenShift 控制台。
  2. 创建 PyTorchJob 资源,如下所示:

    1. Administrator 视角中,点击 Home Search
    2. Project 列表中,选择您的项目。
    3. 单击 Resources 列表,然后在搜索字段中开始输入 PyTorchJob
    4. 选择 PyTorchJob,然后单击 Create PyTorchJob

      Create PyTorchJob 页面将打开,自动添加默认 YAML 代码。

  3. 更新元数据,将 namenamespace 值替换为您的环境的值,如下例所示:

    metadata:
      name: pytorch-multi-node-job
      namespace: test-namespace
    Copy to Clipboard Toggle word wrap
  4. 配置 master 节点,如下例所示:

    spec:
      pytorchReplicaSpecs:
        Master:
          replicas: 1
          restartPolicy: OnFailure
          template:
            metadata:
              labels:
                app: pytorch-multi-node-job
    Copy to Clipboard Toggle word wrap
    1. replicas 条目中,指定 1。只需要一个 master 节点。
    2. 要使用 ConfigMap 资源为 PyTorchJob pod 提供培训脚本,请添加 ConfigMap 卷挂载信息,如下例所示:

      从 ConfigMap 资源添加培训脚本

      Spec:
        pytorchReplicaSpecs:
          Master:
            ...
            template:
              spec:
                containers:
                - name: pytorch
                  image: quay.io/modh/training:py311-cuda121-torch241
                  command: ["python", "/workspace/scripts/train.py"]
                  volumeMounts:
                  - name: training-script-volume
                    mountPath: /workspace
                volumes:
                - name: training-script-volume
                  configMap:
                    name: training-script-configmap
      Copy to Clipboard Toggle word wrap

    3. 为您的环境添加适当的资源限制,如下例所示:

      添加资源 contraints

      SSpec:
        pytorchReplicaSpecs:
          Master:
            ...
            template:
              spec:
                containers: ...
                resources:
                  requests:
                        cpu: "4"
                        memory: "8Gi"
                        nvidia.com/gpu: 2    # To use GPUs (Optional)
                  limits:
                        cpu: "4"
                        memory: "8Gi"
                        nvidia.com/gpu: 2
      Copy to Clipboard Toggle word wrap

  5. PyTorchJob 资源的 Worker 部分中进行类似的编辑。

    1. 更新 replicas 条目,以指定 worker 节点的数量。

    有关 PyTorchJob 资源的完整示例,请参阅 multi-node 培训的 Training Operator PyTorchJob 资源示例

  6. Create

验证

  1. 在 OpenShift 控制台中,打开 Administrator 视角。
  2. Project 列表中,选择您的项目。
  3. Home Search PyTorchJob,并验证作业是否已创建。
  4. Workloads Pods,验证请求的 head pod 和 worker pod 是否正在运行。

您可以使用 OpenShift 命令行界面(CLI)创建 PyTorchJob 资源,以运行 Training Operator PyTorch training 脚本。

先决条件

流程

  1. 登录到 OpenShift CLI,如下所示:

    登录到 OpenShift CLI

    oc login --token=<token> --server=<server>
    Copy to Clipboard Toggle word wrap

    有关如何查找服务器和令牌详情的详情,请参考使用集群服务器和令牌进行身份验证

  2. 创建名为 training .py 的文件,并使用您的培训脚本填充该文件,如下所示:

    创建培训脚本

    cat <<EOF > train.py
    <paste your content here>
    EOF
    Copy to Clipboard Toggle word wrap

    <paste your content here& gt; 替换为您的培训脚本内容。

    有关培训脚本示例,请参阅 培训 Operator PyTorch 培训脚本示例

  3. 创建 ConfigMap 资源来存储培训脚本,如下所示:

    创建 ConfigMap 资源

    oc create configmap training-script-configmap --from-file=train.py -n <your-namespace>
    Copy to Clipboard Toggle word wrap

    <your-namespace > 替换为项目的名称。

  4. 创建名为 pytorchjob.yaml 的文件来定义分布式培训作业设置,如下所示:

    定义分布式培训工作

    cat <<EOF > pytorchjob.py
    <paste your content here>
    EOF
    Copy to Clipboard Toggle word wrap

    <paste your content here > 替换为您的培训工作内容。

    有关示例培训作业,请参阅多节点培训培训 Operator PyTorchJob 资源示例

  5. 创建分布式培训工作,如下所示:

    创建分布式培训工作

    oc apply -f pytorchjob.yaml
    Copy to Clipboard Toggle word wrap

验证

  1. 监控正在运行的分布式培训作业,如下所示:

    监控分布式培训工作

    oc get pytorchjobs -n <your-namespace>
    Copy to Clipboard Toggle word wrap

    <your-namespace > 替换为项目的名称。

  2. 检查 pod 日志,如下所示:

    检查 pod 日志

    oc logs <pod-name> -n <your-namespace>
    Copy to Clipboard Toggle word wrap

    <your-namespace > 替换为项目的名称。

  3. 当您要删除作业时,运行以下命令:

    删除作业

    oc delete pytorchjobs/pytorch-multi-node-job -n <your-namespace>
    Copy to Clipboard Toggle word wrap

    <your-namespace > 替换为项目的名称。

4.1.4. 培训 Operator PyTorch 培训脚本示例

以下示例演示了如何为 NVIDIA Collective Communications Library (NCCL)、分布式数据并行(DDP)和 Fully Sharded Data Parallel (FSDP)培训工作配置 PyTorch 培训脚本。

注意

如果您有所需资源,可以在不编辑的情况下运行示例代码。

或者,您可以修改示例代码,以指定适合您的培训工作配置。

4.1.4.1. 培训 Operator PyTorch 培训脚本示例:NCCL

此 NVIDIA Collective Communications Library (NCCL)示例返回每个加速器的 rank 和 10sor 值。

import os
import torch
import torch.distributed as dist

def main():
    # Select backend dynamically: nccl for GPU, gloo for CPU
    backend = "nccl" if torch.cuda.is_available() else "gloo"

    # Initialize the process group
    dist.init_process_group(backend)

    # Get rank and world size
    rank = dist.get_rank()
    world_size = dist.get_world_size()

    # Select device dynamically
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    print(f"Running on rank {rank} out of {world_size} using {device} with backend {backend}.")

    # Initialize tensor on the selected device
    tensor = torch.zeros(1, device=device)

    if rank == 0:
        tensor += 1
        for i in range(1, world_size):
            dist.send(tensor, dst=i)
    else:
        dist.recv(tensor, src=0)

    print(f"Rank {rank}: Tensor value {tensor.item()} on {device}")

if name == "main":
    main()
Copy to Clipboard Toggle word wrap

backend 值会自动设置为以下值之一:

  • nccl :将 NVIDIA Collective Communications Library (NCCL)用于 NVIDIA GPU 或 ROCm communications Collectives Library (RCCL)用于 AMD GPU
  • gloo: 对 CPU 使用 Gloo
注意

为 NVIDIA GPU 和 AMD GPU 指定 backend="nccl"。

对于 AMD GPU,即使 backend 值设置为 nccl,ROCm 环境使用 RCCL 进行通信。

4.1.4.2. 培训 Operator PyTorch 培训脚本示例:DDP

本例演示了如何为分布式数据并行(DDP)培训作业配置培训脚本。

import os
import sys
import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch import nn, optim

# Enable verbose logging
os.environ["TORCH_DISTRIBUTED_DEBUG"] = "INFO"

def setup_ddp():
    """Initialize the distributed process group dynamically."""
    backend = "nccl" if torch.cuda.is_available() else "gloo"
    dist.init_process_group(backend=backend)
    local_rank = int(os.environ["LOCAL_RANK"])
    world_size = dist.get_world_size()

    # Ensure correct device is set
    device = torch.device(f"cuda:{local_rank}" if torch.cuda.is_available() else "cpu")
    torch.cuda.set_device(local_rank) if torch.cuda.is_available() else None

    print(f"[Rank {local_rank}] Initialized with backend={backend}, world_size={world_size}")
    sys.stdout.flush()  # Ensure logs are visible in Kubernetes
    return local_rank, world_size, device

def cleanup():
    """Clean up the distributed process group."""
    dist.destroy_process_group()

class SimpleModel(nn.Module):
    """A simple model with multiple layers."""
    def init(self):
        super(SimpleModel, self).init()
        self.layer1 = nn.Linear(1024, 512)
        self.layer2 = nn.Linear(512, 256)
        self.layer3 = nn.Linear(256, 128)
        self.layer4 = nn.Linear(128, 64)
        self.output = nn.Linear(64, 1)

    def forward(self, x):
        x = torch.relu(self.layer1(x))
        x = torch.relu(self.layer2(x))
        x = torch.relu(self.layer3(x))
        x = torch.relu(self.layer4(x))
        return self.output(x)

def log_ddp_parameters(model, rank):
    """Log model parameter count for DDP."""
    num_params = sum(p.numel() for p in model.parameters())
    print(f"[Rank {rank}] Model has {num_params} parameters (replicated across all ranks)")
    sys.stdout.flush()

def log_memory_usage(rank):
    """Log GPU memory usage if CUDA is available."""
    if torch.cuda.is_available():
        torch.cuda.synchronize()
        print(f"[Rank {rank}] GPU Memory Allocated: {torch.cuda.memory_allocated() / 1e6} MB")
        print(f"[Rank {rank}] GPU Memory Reserved: {torch.cuda.memory_reserved() / 1e6} MB")
        sys.stdout.flush()

def main():
    local_rank, world_size, device = setup_ddp()

    # Initialize model and wrap with DDP
    model = SimpleModel().to(device)
    model = DDP(model, device_ids=[local_rank] if torch.cuda.is_available() else None)

    print(f"[Rank {local_rank}] DDP Initialized")
    log_ddp_parameters(model, local_rank)
    log_memory_usage(local_rank)

    # Optimizer and criterion
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    criterion = nn.MSELoss()

    # Dummy dataset (adjust for real-world use case)
    x = torch.randn(32, 1024).to(device)
    y = torch.randn(32, 1).to(device)

    # Training loop
    for epoch in range(5):
        model.train()
        optimizer.zero_grad()

        # Forward pass
        outputs = model(x)
        loss = criterion(outputs, y)

        # Backward pass
        loss.backward()
        optimizer.step()

        print(f"[Rank {local_rank}] Epoch {epoch}, Loss: {loss.item()}")
        log_memory_usage(local_rank)  # Track memory usage

        sys.stdout.flush()  # Ensure logs appear in real-time

    cleanup()

if name == "main":
    main()
Copy to Clipboard Toggle word wrap

4.1.4.3. 培训 Operator PyTorch 培训脚本示例:FSDP

本例演示了如何为 Fully Sharded Data Parallel (FSDP)培训工作配置培训脚本。

import os
import sys
import torch
import torch.distributed as dist
from torch.distributed.fsdp import FullyShardedDataParallel as FSDP, CPUOffload
from torch.distributed.fsdp.wrap import always_wrap_policy
from torch import nn, optim

# Enable verbose logging for debugging
os.environ["TORCH_DISTRIBUTED_DEBUG"] = "INFO"  # Enables detailed FSDP logs

def setup_ddp():
    """Initialize the distributed process group dynamically."""
    backend = "nccl" if torch.cuda.is_available() else "gloo"
    dist.init_process_group(backend=backend)
    local_rank = int(os.environ["LOCAL_RANK"])
    world_size = dist.get_world_size()

    # Ensure the correct device is set
    device = torch.device(f"cuda:{local_rank}" if torch.cuda.is_available() else "cpu")
    torch.cuda.set_device(local_rank) if torch.cuda.is_available() else None

    print(f"[Rank {local_rank}] Initialized with backend={backend}, world_size={world_size}")
    sys.stdout.flush()  # Ensure logs are visible in Kubernetes
    return local_rank, world_size, device

def cleanup():
    """Clean up the distributed process group."""
    dist.destroy_process_group()

class SimpleModel(nn.Module):
    """A simple model with multiple layers."""
    def init(self):
        super(SimpleModel, self).init()
        self.layer1 = nn.Linear(1024, 512)
        self.layer2 = nn.Linear(512, 256)
        self.layer3 = nn.Linear(256, 128)
        self.layer4 = nn.Linear(128, 64)
        self.output = nn.Linear(64, 1)

    def forward(self, x):
        x = torch.relu(self.layer1(x))
        x = torch.relu(self.layer2(x))
        x = torch.relu(self.layer3(x))
        x = torch.relu(self.layer4(x))
        return self.output(x)

def log_fsdp_parameters(model, rank):
    """Log FSDP parameters and sharding strategy."""
    num_params = sum(p.numel() for p in model.parameters())
    print(f"[Rank {rank}] Model has {num_params} parameters (sharded across {dist.get_world_size()} workers)")
    sys.stdout.flush()

def log_memory_usage(rank):
    """Log GPU memory usage if CUDA is available."""
    if torch.cuda.is_available():
        torch.cuda.synchronize()
        print(f"[Rank {rank}] GPU Memory Allocated: {torch.cuda.memory_allocated() / 1e6} MB")
        print(f"[Rank {rank}] GPU Memory Reserved: {torch.cuda.memory_reserved() / 1e6} MB")
        sys.stdout.flush()

def main():
    local_rank, world_size, device = setup_ddp()

    # Initialize model and wrap with FSDP
    model = SimpleModel().to(device)
    model = FSDP(
        model,
        cpu_offload=CPUOffload(offload_params=not torch.cuda.is_available()),  # Offload if no GPU
        auto_wrap_policy=always_wrap_policy,  # Wrap all layers automatically
    )

    print(f"[Rank {local_rank}] FSDP Initialized")
    log_fsdp_parameters(model, local_rank)
    log_memory_usage(local_rank)

    # Optimizer and criterion
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    criterion = nn.MSELoss()

    # Dummy dataset (adjust for real-world use case)
    x = torch.randn(32, 1024).to(device)
    y = torch.randn(32, 1).to(device)

    # Training loop
    for epoch in range(5):
        model.train()
        optimizer.zero_grad()

        # Forward pass
        outputs = model(x)
        loss = criterion(outputs, y)

        # Backward pass
        loss.backward()
        optimizer.step()

        print(f"[Rank {local_rank}] Epoch {epoch}, Loss: {loss.item()}")
        log_memory_usage(local_rank)  # Track memory usage

        sys.stdout.flush()  # Ensure logs appear in real-time

    cleanup()

if name == "main":
    main()
Copy to Clipboard Toggle word wrap

您可以使用此示例 Dockerfile 将培训脚本包含在自定义培训镜像中。

FROM quay.io/modh/training:py311-cuda121-torch241
WORKDIR /workspace
COPY train.py /workspace/train.py
CMD ["python", "train.py"]
Copy to Clipboard Toggle word wrap

这个示例将培训脚本复制到默认的 PyTorch 镜像,并运行 脚本。

有关如何使用此 Dockerfile 在自定义容器镜像中包含培训脚本的更多信息,请参阅 创建自定义培训镜像

本例演示了如何创建在具有多个 GPU 的多个节点上运行的培训 Operator PyTorch 培训作业。

apiVersion: kubeflow.org/v1
kind: PyTorchJob
metadata:
  name: pytorch-multi-node-job
  namespace: test-namespace
spec:
  pytorchReplicaSpecs:
    Master:
      replicas: 1
      restartPolicy: OnFailure
      template:
        metadata:
          labels:
            app: pytorch-multi-node-job
        spec:
          containers:
          - name: pytorch
            image: quay.io/modh/training:py311-cuda121-torch241
            imagePullPolicy: IfNotPresent
            command: ["torchrun", "/workspace/train.py"]
            volumeMounts:
              - name: training-script-volume
                mountPath: /workspace
            resources:
              requests:
                cpu: "4"
                memory: "8Gi"
                nvidia.com/gpu: "2"
              limits:
                cpu: "4"
                memory: "8Gi"
                nvidia.com/gpu: "2"
          volumes:
            - name: training-script-volume
              configMap:
                name: training-script-configmap
    Worker:
      replicas: 1
      restartPolicy: OnFailure
      template:
        metadata:
          labels:
            app: pytorch-multi-node-job
        spec:
          containers:
          - name: pytorch
            image: quay.io/modh/training:py311-cuda121-torch241
            imagePullPolicy: IfNotPresent
            command: ["torchrun", "/workspace/train.py"]
            volumeMounts:
              - name: training-script-volume
                mountPath: /workspace
            resources:
              requests:
                cpu: "4"
                memory: "8Gi"
                nvidia.com/gpu: "2"
              limits:
                cpu: "4"
                memory: "8Gi"
                nvidia.com/gpu: "2"
          volumes:
            - name: training-script-volume
              configMap:
                name: training-script-configmap
Copy to Clipboard Toggle word wrap
返回顶部
Red Hat logoGithubredditYoutubeTwitter

学习

尝试、购买和销售

社区

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。 了解我们当前的更新.

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

Theme

© 2025 Red Hat