Este contenido no está disponible en el idioma seleccionado.

Chapter 4. Running Training Operator-based distributed training workloads


To reduce the time needed to train a Large Language Model (LLM), you can run the training job in parallel. In Red Hat OpenShift AI, the Kubeflow Training Operator and Kubeflow Training Operator Python Software Development Kit (Training Operator SDK) simplify the job configuration.

You can use the Training Operator and the Training Operator SDK to configure a training job in a variety of ways. For example, you can use multiple nodes and multiple GPUs per node, fine-tune a model, or configure a training job to use Remote Direct Memory Access (RDMA).

You can use the Training Operator PyTorchJob API to configure a PyTorchJob resource so that the training job runs on multiple nodes with multiple GPUs.

You can store the training script in a ConfigMap resource, or include it in a custom container image.

You can create a ConfigMap resource to store the Training Operator PyTorch training script.

Note

Alternatively, you can use the example Dockerfile to include the training script in a custom container image, as described in Creating a custom training image.

Prerequisites

Procedure

  1. Log in to the OpenShift Console.
  2. Create a ConfigMap resource, as follows:

    1. In the Administrator perspective, click Workloads ConfigMaps.
    2. From the Project list, select your project.
    3. Click Create ConfigMap.
    4. In the Configure via section, select the YAML view option.

      The Create ConfigMap page opens, with default YAML code automatically added.

  3. Replace the default YAML code with your training-script code.

    For example training scripts, see Example Training Operator PyTorch training scripts.

  4. Click Create.

Verification

  1. In the OpenShift Console, in the Administrator perspective, click Workloads ConfigMaps.
  2. From the Project list, select your project.
  3. Click your ConfigMap resource to display the training script details.

4.1.2. Creating a Training Operator PyTorchJob resource

You can create a PyTorchJob resource to run the Training Operator PyTorch training script.

Prerequisites

  • You can access an OpenShift cluster that has multiple worker nodes with supported NVIDIA GPUs or AMD GPUs.
  • Your cluster administrator has configured the cluster as follows:

  • You can access a workbench that is suitable for distributed training, as described in Creating a workbench for distributed training.
  • You have administrator access for the project.

    • If you created the project, you automatically have administrator access.
    • If you did not create the project, your cluster administrator must give you administrator access.

Procedure

  1. Log in to the OpenShift Console.
  2. Create a PyTorchJob resource, as follows:

    1. In the Administrator perspective, click Home Search.
    2. From the Project list, select your project.
    3. Click the Resources list, and in the search field, start typing PyTorchJob.
    4. Select PyTorchJob, and click Create PyTorchJob.

      The Create PyTorchJob page opens, with default YAML code automatically added.

  3. Update the metadata to replace the name and namespace values with the values for your environment, as shown in the following example:

    metadata:
      name: pytorch-multi-node-job
      namespace: test-namespace
  4. Configure the master node, as shown in the following example:

    spec:
      pytorchReplicaSpecs:
        Master:
          replicas: 1
          restartPolicy: OnFailure
          template:
            metadata:
              labels:
                app: pytorch-multi-node-job
    1. In the replicas entry, specify 1. Only one master node is needed.
    2. To use a ConfigMap resource to provide the training script for the PyTorchJob pods, add the ConfigMap volume mount information, as shown in the following example:

      Adding the training script from a ConfigMap resource

      Spec:
        pytorchReplicaSpecs:
          Master:
            ...
            template:
              spec:
                containers:
                - name: pytorch
                  image: quay.io/modh/training:py311-cuda124-torch251
                  command: ["python", "/workspace/scripts/train.py"]
                  volumeMounts:
                  - name: training-script-volume
                    mountPath: /workspace
                volumes:
                - name: training-script-volume
                  configMap:
                    name: training-script-configmap

    3. Add the appropriate resource constraints for your environment, as shown in the following example:

      Adding the resource contraints

      SSpec:
        pytorchReplicaSpecs:
          Master:
            ...
            template:
              spec:
                containers: ...
                resources:
                  requests:
                        cpu: "4"
                        memory: "8Gi"
                        nvidia.com/gpu: 2    # To use GPUs (Optional)
                  limits:
                        cpu: "4"
                        memory: "8Gi"
                        nvidia.com/gpu: 2

  5. Make similar edits in the Worker section of the PyTorchJob resource.

    1. Update the replicas entry to specify the number of worker nodes.

    For a complete example PyTorchJob resource, see Example Training Operator PyTorchJob resource for multi-node training.

  6. Click Create.

Verification

  1. In the OpenShift Console, open the Administrator perspective.
  2. From the Project list, select your project.
  3. Click Home Search PyTorchJob and verify that the job was created.
  4. Click Workloads Pods and verify that requested head pod and worker pods are running.

You can use the OpenShift CLI (oc) to create a PyTorchJob resource to run the Training Operator PyTorch training script.

Prerequisites

Procedure

  1. Log in to the OpenShift CLI (oc), as follows:

    Logging in to the OpenShift CLI (oc)

    oc login --token=<token> --server=<server>

    For information about how to find the server and token details, see Using the cluster server and token to authenticate.

  2. Create a file named train.py and populate it with your training script, as follows:

    Creating the training script

    cat <<EOF > train.py
    <paste your content here>
    EOF

    Replace <paste your content here> with your training script content.

    For example training scripts, see Example Training Operator PyTorch training scripts.

  3. Create a ConfigMap resource to store the training script, as follows:

    Creating the ConfigMap resource

    oc create configmap training-script-configmap --from-file=train.py -n <your-namespace>

    Replace <your-namespace> with the name of your project.

  4. Create a file named pytorchjob.yaml to define the distributed training job setup, as follows:

    Defining the distributed training job

    cat <<EOF > pytorchjob.py
    <paste your content here>
    EOF

    Replace <paste your content here> with your training job content.

    For an example training job, see Example Training Operator PyTorchJob resource for multi-node training.

  5. Create the distributed training job, as follows:

    Creating the distributed training job

    oc apply -f pytorchjob.yaml

Verification

  1. Monitor the running distributed training job, as follows:

    Monitoring the distributed training job

    oc get pytorchjobs -n <your-namespace>

    Replace <your-namespace> with the name of your project.

  2. Check the pod logs, as follows:

    Checking the pod logs

    oc logs <pod-name> -n <your-namespace>

    Replace <your-namespace> with the name of your project.

  3. When you want to delete the job, run the following command:

    Deleting the job

    oc delete pytorchjobs/pytorch-multi-node-job -n <your-namespace>

    Replace <your-namespace> with the name of your project.

4.1.4. Example Training Operator PyTorch training scripts

The following examples show how to configure a PyTorch training script for NVIDIA Collective Communications Library (NCCL), Distributed Data Parallel (DDP), and Fully Sharded Data Parallel (FSDP) training jobs.

Note

If you have the required resources, you can run the example code without editing it.

Alternatively, you can modify the example code to specify the appropriate configuration for your training job.

4.1.4.1. Example Training Operator PyTorch training script: NCCL

This NVIDIA Collective Communications Library (NCCL) example returns the rank and tensor value for each accelerator.

import os
import torch
import torch.distributed as dist

def main():
    # Select backend dynamically: nccl for GPU, gloo for CPU
    backend = "nccl" if torch.cuda.is_available() else "gloo"

    # Initialize the process group
    dist.init_process_group(backend)

    # Get rank and world size
    rank = dist.get_rank()
    world_size = dist.get_world_size()

    # Select device dynamically
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    print(f"Running on rank {rank} out of {world_size} using {device} with backend {backend}.")

    # Initialize tensor on the selected device
    tensor = torch.zeros(1, device=device)

    if rank == 0:
        tensor += 1
        for i in range(1, world_size):
            dist.send(tensor, dst=i)
    else:
        dist.recv(tensor, src=0)

    print(f"Rank {rank}: Tensor value {tensor.item()} on {device}")

if name == "main":
    main()

The backend value is automatically set to one of the following values:

  • nccl: Uses NVIDIA Collective Communications Library (NCCL) for NVIDIA GPUs or ROCm Communication Collectives Library (RCCL) for AMD GPUs
  • gloo: Uses Gloo for CPUs
Note

Specify backend="nccl" for both NVIDIA GPUs and AMD GPUs.

For AMD GPUs, even though the backend value is set to nccl, the ROCm environment uses RCCL for communication.

4.1.4.2. Example Training Operator PyTorch training script: DDP

This example shows how to configure a training script for a Distributed Data Parallel (DDP) training job.

import os
import sys
import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch import nn, optim

# Enable verbose logging
os.environ["TORCH_DISTRIBUTED_DEBUG"] = "INFO"

def setup_ddp():
    """Initialize the distributed process group dynamically."""
    backend = "nccl" if torch.cuda.is_available() else "gloo"
    dist.init_process_group(backend=backend)
    local_rank = int(os.environ["LOCAL_RANK"])
    world_size = dist.get_world_size()

    # Ensure correct device is set
    device = torch.device(f"cuda:{local_rank}" if torch.cuda.is_available() else "cpu")
    torch.cuda.set_device(local_rank) if torch.cuda.is_available() else None

    print(f"[Rank {local_rank}] Initialized with backend={backend}, world_size={world_size}")
    sys.stdout.flush()  # Ensure logs are visible in Kubernetes
    return local_rank, world_size, device

def cleanup():
    """Clean up the distributed process group."""
    dist.destroy_process_group()

class SimpleModel(nn.Module):
    """A simple model with multiple layers."""
    def init(self):
        super(SimpleModel, self).init()
        self.layer1 = nn.Linear(1024, 512)
        self.layer2 = nn.Linear(512, 256)
        self.layer3 = nn.Linear(256, 128)
        self.layer4 = nn.Linear(128, 64)
        self.output = nn.Linear(64, 1)

    def forward(self, x):
        x = torch.relu(self.layer1(x))
        x = torch.relu(self.layer2(x))
        x = torch.relu(self.layer3(x))
        x = torch.relu(self.layer4(x))
        return self.output(x)

def log_ddp_parameters(model, rank):
    """Log model parameter count for DDP."""
    num_params = sum(p.numel() for p in model.parameters())
    print(f"[Rank {rank}] Model has {num_params} parameters (replicated across all ranks)")
    sys.stdout.flush()

def log_memory_usage(rank):
    """Log GPU memory usage if CUDA is available."""
    if torch.cuda.is_available():
        torch.cuda.synchronize()
        print(f"[Rank {rank}] GPU Memory Allocated: {torch.cuda.memory_allocated() / 1e6} MB")
        print(f"[Rank {rank}] GPU Memory Reserved: {torch.cuda.memory_reserved() / 1e6} MB")
        sys.stdout.flush()

def main():
    local_rank, world_size, device = setup_ddp()

    # Initialize model and wrap with DDP
    model = SimpleModel().to(device)
    model = DDP(model, device_ids=[local_rank] if torch.cuda.is_available() else None)

    print(f"[Rank {local_rank}] DDP Initialized")
    log_ddp_parameters(model, local_rank)
    log_memory_usage(local_rank)

    # Optimizer and criterion
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    criterion = nn.MSELoss()

    # Dummy dataset (adjust for real-world use case)
    x = torch.randn(32, 1024).to(device)
    y = torch.randn(32, 1).to(device)

    # Training loop
    for epoch in range(5):
        model.train()
        optimizer.zero_grad()

        # Forward pass
        outputs = model(x)
        loss = criterion(outputs, y)

        # Backward pass
        loss.backward()
        optimizer.step()

        print(f"[Rank {local_rank}] Epoch {epoch}, Loss: {loss.item()}")
        log_memory_usage(local_rank)  # Track memory usage

        sys.stdout.flush()  # Ensure logs appear in real-time

    cleanup()

if name == "main":
    main()

4.1.4.3. Example Training Operator PyTorch training script: FSDP

This example shows how to configure a training script for a Fully Sharded Data Parallel (FSDP) training job.

import os
import sys
import torch
import torch.distributed as dist
from torch.distributed.fsdp import FullyShardedDataParallel as FSDP, CPUOffload
from torch.distributed.fsdp.wrap import always_wrap_policy
from torch import nn, optim

# Enable verbose logging for debugging
os.environ["TORCH_DISTRIBUTED_DEBUG"] = "INFO"  # Enables detailed FSDP logs

def setup_ddp():
    """Initialize the distributed process group dynamically."""
    backend = "nccl" if torch.cuda.is_available() else "gloo"
    dist.init_process_group(backend=backend)
    local_rank = int(os.environ["LOCAL_RANK"])
    world_size = dist.get_world_size()

    # Ensure the correct device is set
    device = torch.device(f"cuda:{local_rank}" if torch.cuda.is_available() else "cpu")
    torch.cuda.set_device(local_rank) if torch.cuda.is_available() else None

    print(f"[Rank {local_rank}] Initialized with backend={backend}, world_size={world_size}")
    sys.stdout.flush()  # Ensure logs are visible in Kubernetes
    return local_rank, world_size, device

def cleanup():
    """Clean up the distributed process group."""
    dist.destroy_process_group()

class SimpleModel(nn.Module):
    """A simple model with multiple layers."""
    def init(self):
        super(SimpleModel, self).init()
        self.layer1 = nn.Linear(1024, 512)
        self.layer2 = nn.Linear(512, 256)
        self.layer3 = nn.Linear(256, 128)
        self.layer4 = nn.Linear(128, 64)
        self.output = nn.Linear(64, 1)

    def forward(self, x):
        x = torch.relu(self.layer1(x))
        x = torch.relu(self.layer2(x))
        x = torch.relu(self.layer3(x))
        x = torch.relu(self.layer4(x))
        return self.output(x)

def log_fsdp_parameters(model, rank):
    """Log FSDP parameters and sharding strategy."""
    num_params = sum(p.numel() for p in model.parameters())
    print(f"[Rank {rank}] Model has {num_params} parameters (sharded across {dist.get_world_size()} workers)")
    sys.stdout.flush()

def log_memory_usage(rank):
    """Log GPU memory usage if CUDA is available."""
    if torch.cuda.is_available():
        torch.cuda.synchronize()
        print(f"[Rank {rank}] GPU Memory Allocated: {torch.cuda.memory_allocated() / 1e6} MB")
        print(f"[Rank {rank}] GPU Memory Reserved: {torch.cuda.memory_reserved() / 1e6} MB")
        sys.stdout.flush()

def main():
    local_rank, world_size, device = setup_ddp()

    # Initialize model and wrap with FSDP
    model = SimpleModel().to(device)
    model = FSDP(
        model,
        cpu_offload=CPUOffload(offload_params=not torch.cuda.is_available()),  # Offload if no GPU
        auto_wrap_policy=always_wrap_policy,  # Wrap all layers automatically
    )

    print(f"[Rank {local_rank}] FSDP Initialized")
    log_fsdp_parameters(model, local_rank)
    log_memory_usage(local_rank)

    # Optimizer and criterion
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    criterion = nn.MSELoss()

    # Dummy dataset (adjust for real-world use case)
    x = torch.randn(32, 1024).to(device)
    y = torch.randn(32, 1).to(device)

    # Training loop
    for epoch in range(5):
        model.train()
        optimizer.zero_grad()

        # Forward pass
        outputs = model(x)
        loss = criterion(outputs, y)

        # Backward pass
        loss.backward()
        optimizer.step()

        print(f"[Rank {local_rank}] Epoch {epoch}, Loss: {loss.item()}")
        log_memory_usage(local_rank)  # Track memory usage

        sys.stdout.flush()  # Ensure logs appear in real-time

    cleanup()

if name == "main":
    main()

You can use this example Dockerfile to include the training script in a custom training image.

FROM quay.io/modh/training:py311-cuda124-torch251
WORKDIR /workspace
COPY train.py /workspace/train.py
CMD ["python", "train.py"]

This example copies the training script to the default PyTorch image, and runs the script.

For more information about how to use this Dockerfile to include the training script in a custom container image, see Creating a custom training image.

This example shows how to create a Training Operator PyTorch training job that runs on multiple nodes with multiple GPUs.

apiVersion: kubeflow.org/v1
kind: PyTorchJob
metadata:
  name: pytorch-multi-node-job
  namespace: test-namespace
spec:
  pytorchReplicaSpecs:
    Master:
      replicas: 1
      restartPolicy: OnFailure
      template:
        metadata:
          labels:
            app: pytorch-multi-node-job
        spec:
          containers:
          - name: pytorch
            image: quay.io/modh/training:py311-cuda124-torch251
            imagePullPolicy: IfNotPresent
            command: ["torchrun", "/workspace/train.py"]
            volumeMounts:
              - name: training-script-volume
                mountPath: /workspace
            resources:
              requests:
                cpu: "4"
                memory: "8Gi"
                nvidia.com/gpu: "2"
              limits:
                cpu: "4"
                memory: "8Gi"
                nvidia.com/gpu: "2"
          volumes:
            - name: training-script-volume
              configMap:
                name: training-script-configmap
    Worker:
      replicas: 1
      restartPolicy: OnFailure
      template:
        metadata:
          labels:
            app: pytorch-multi-node-job
        spec:
          containers:
          - name: pytorch
            image: quay.io/modh/training:py311-cuda124-torch251
            imagePullPolicy: IfNotPresent
            command: ["torchrun", "/workspace/train.py"]
            volumeMounts:
              - name: training-script-volume
                mountPath: /workspace
            resources:
              requests:
                cpu: "4"
                memory: "8Gi"
                nvidia.com/gpu: "2"
              limits:
                cpu: "4"
                memory: "8Gi"
                nvidia.com/gpu: "2"
          volumes:
            - name: training-script-volume
              configMap:
                name: training-script-configmap

You can use the Training Operator SDK to configure a distributed training job to run on multiple nodes with multiple accelerators per node.

You can configure the PyTorchJob resource so that the training job runs on multiple nodes with multiple GPUs.

4.2.1. Configuring a training job by using the Training Operator SDK

Before you can run a job to train a model, you must configure the training job. You must set the training parameters, define the training function, and configure the Training Operator SDK.

Note

The code in this procedure specifies how to configure an example training job. If you have the specified resources, you can run the example code without editing it.

Alternatively, you can modify the example code to specify the appropriate configuration for your training job.

Prerequisites

  • You can access an OpenShift cluster that has sufficient worker nodes with supported accelerators to run your training or tuning job.
  • You can access a workbench that is suitable for distributed training, as described in Creating a workbench for distributed training.
  • You have administrator access for the project.

    • If you created the project, you automatically have administrator access.
    • If you did not create the project, your cluster administrator must give you administrator access.

Procedure

  1. Open the workbench, as follows:

    1. Log in to the Red Hat OpenShift AI web console.
    2. Click Projects and click your project.
    3. Click the Workbenches tab.
    4. If your workbench is not already running, start the workbench.
    5. Click the Open link to open the IDE in a new window.
  2. Click File New Notebook.
  3. Create the training function as shown in the following example:

    1. Create a cell with the following content:

      Example training function

      def train_func():
          import os
          import torch
          import torch.distributed as dist
      
          # Select backend dynamically: nccl for GPU, gloo for CPU
          backend = "nccl" if torch.cuda.is_available() else "gloo"
      
          # Initialize the process group
          dist.init_process_group(backend)
      
          # Get rank and world size
          rank = dist.get_rank()
          world_size = dist.get_world_size()
      
          # Select device dynamically
          device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
      
          # Log rank initialization
          print(f"Rank {rank}/{world_size} initialized with backend {backend} on device {device}.")
      
          # Initialize tensor on the selected device
          tensor = torch.zeros(1, device=device)
      
          if rank == 0:
              tensor += 1
              for i in range(1, world_size):
                  dist.send(tensor, dst=i)
          else:
              dist.recv(tensor, src=0)
      
          print(f"Rank {rank}: Tensor value {tensor.item()} on {device}")
      
          # Cleanup
          dist.destroy_process_group()

      Note

      For this example training job, you do not need to install any additional packages or set any training parameters.

      For more information about how to add additional packages and set the training parameters, see Configuring the fine-tuning job.

    2. Optional: Edit the content to specify the appropriate values for your environment.
    3. Run the cell to create the training function.
  4. Configure the Training Operator SDK client authentication as follows:

    1. Create a cell with the following content:

      Example Training Operator SDK client authentication

      from kubernetes import client
      from kubeflow.training import TrainingClient
      from kubeflow.training.models import V1Volume, V1VolumeMount, V1PersistentVolumeClaimVolumeSource
      
      api_server = "<API_SERVER>"
      token = "<TOKEN>"
      
      configuration = client.Configuration()
      configuration.host = api_server
      configuration.api_key = {"authorization": f"Bearer {token}"}
      # Un-comment if your cluster API server uses a self-signed certificate or an un-trusted CA
      #configuration.verify_ssl = False
      api_client = client.ApiClient(configuration)
      client = TrainingClient(client_configuration=api_client.configuration)

    2. Edit the api_server and token parameters to enter the values to authenticate to your OpenShift cluster.

      For information on how to find the server and token details, see Using the cluster server and token to authenticate.

    3. Run the cell to configure the Training Operator SDK client authentication.
  5. Click File > Save Notebook As, enter an appropriate file name, and click Save.

Verification

  1. All cells run successfully.

4.2.2. Running a training job by using the Training Operator SDK

When you run a training job to tune a model, you must specify the resources needed, and provide any authorization credentials required.

Note

The code in this procedure specifies how to run the example training job. If you have the specified resources, you can run the example code without editing it.

Alternatively, you can modify the example code to specify the appropriate details for your training job.

Prerequisites

  • You can access an OpenShift cluster that has sufficient worker nodes with supported accelerators to run your training or tuning job.
  • You can access a workbench that is suitable for distributed training, as described in Creating a workbench for distributed training.
  • You have administrator access for the project.

    • If you created the project, you automatically have administrator access.
    • If you did not create the project, your cluster administrator must give you administrator access.
  • You have enabled your project for Kueue management by applying the kueue.openshift.io/managed=true label to the project namespace.
  • You have created resource flavor, cluster queue, and local queue Kueue objects for your project. For more information about creating these objects, see Configuring quota management for distributed workloads.
  • You have access to a model.
  • You have access to data that you can use to train the model.
  • You have configured the training job as described in Configuring a training job by using the Training Operator SDK.

Procedure

  1. Open the workbench, as follows:

    1. Log in to the Red Hat OpenShift AI web console.
    2. Click Projects and click your project.
    3. Click the Workbenches tab. If your workbench is not already running, start the workbench.
    4. Click the Open link to open the IDE in a new window.
  2. Click File Open, and open the Jupyter notebook that you used to configure the training job.
  3. Create a cell to run the job, and add the following content:

    from kubernetes import client
    
    # Start PyTorchJob with 2 Workers and 2 GPU per Worker (multi-node, multi-worker job).
    client.create_job(
       name="pytorch-ddp",
       train_func=train_func,
       base_image="quay.io/modh/training:py311-cuda124-torch251",
       num_workers=2,
       resources_per_worker={"nvidia.com/gpu": "2"},
       packages_to_install=["torchvision==0.19.0"],
       env_vars={"NCCL_DEBUG": "INFO", "TORCH_DISTRIBUTED_DEBUG": "DETAIL"},
       labels={
            "kueue.x-k8s.io/queue-name": "<local-queue-name>",
            "key": "value"
        },
       annotations={"key": "value"}
    )
  4. Edit the content to specify the appropriate values for your environment, as follows:

    1. Edit the num_workers value to specify the number of worker nodes.
    2. Update the resources_per_worker values according to the job requirements and the resources available.
    3. Edit the value of the kueue.x-k8s.io/queue-name label to match the name of your target LocalQueue.
    4. The example provided is for NVIDIA GPUs. If you use AMD accelerators, make the following additional changes:

      • In the resources_per_worker entry, change nvidia.com/gpu to amd.com/gpu
      • Change the base_image value to quay.io/modh/training:py311-rocm62-torch251
      • Remove the NCCL_DEBUG entry

If the job_kind value is not explicitly set, the TrainingClient API automatically sets the job_kind value to PyTorchJob.

  1. Run the cell to run the job.

Verification

View the progress of the job as follows:

  1. Create a cell with the following content:

    client.get_job_logs(
        name="pytorch-ddp",
        job_kind="PyTorchJob",
        follow=True,
    )
  2. Run the cell to view the job progress.

4.3. Fine-tuning a model by using Kubeflow Training

Supervised fine-tuning (SFT) is the process of customizing a Large Language Model (LLM) for a specific task by using labelled data. In this example, you use the Kubeflow Training Operator and Kubeflow Training Operator Python Software Development Kit (Training Operator SDK) to supervise fine-tune an LLM in Red Hat OpenShift AI, by using the Hugging Face SFT Trainer.

Optionally, you can use Low-Rank Adaptation (LoRA) to efficiently fine-tune large language models. LORA optimizes computational requirements and reduces memory footprint, enabling you to fine-tune on consumer-grade GPUs. With SFT, you can combine PyTorch Fully Sharded Data Parallel (FSDP) and LoRA to enable scalable, cost-effective model training and inference, enhancing the flexibility and performance of AI workloads within OpenShift environments.

4.3.1. Configuring the fine-tuning job

Before you can use a training job to fine-tune a model, you must configure the training job. You must set the training parameters, define the training function, and configure the Training Operator SDK.

Note

The code in this procedure specifies how to configure an example fine-tuning job. If you have the specified resources, you can run the example code without editing it.

Alternatively, you can modify the example code to specify the appropriate configuration for your fine-tuning job.

Prerequisites

  • You can access an OpenShift cluster that has sufficient worker nodes with supported accelerators to run your training or tuning job.

    The example fine-tuning job requires 8 worker nodes, where each worker node has 64 GiB memory, 4 CPUs, and 1 NVIDIA GPU.

  • You can access a workbench that is suitable for distributed training, as described in Creating a workbench for distributed training.
  • You can access a dynamic storage provisioner that supports ReadWriteMany (RWX) Persistent Volume Claim (PVC) provisioning, such as Red Hat OpenShift Data Foundation.
  • You have administrator access for the project.

    • If you created the project, you automatically have administrator access.
    • If you did not create the project, your cluster administrator must give you administrator access.

Procedure

  1. Open the workbench, as follows:

    1. Log in to the Red Hat OpenShift AI web console.
    2. Click Projects and click your project.
    3. Click the Workbenches tab.
    4. Ensure that the workbench uses a storage class with RWX capability.
    5. If your workbench is not already running, start the workbench.
    6. Click the Open link to open the IDE in a new window.
  2. Click File New Notebook.
  3. Install any additional packages that are needed to run the training or tuning job.

    1. In a notebook cell, add the code to install the additional packages, as follows:

      Code to install dependencies

      # Install the yamlmagic package
      !pip install yamlmagic
      %load_ext yamlmagic
      
      !pip install git+https://github.com/kubeflow/trainer.git@release-1.9#subdirectory=sdk/python

    2. Select the cell, and click Run > Run selected cell.

      The additional packages are installed.

  4. Set the training parameters as follows:

    1. Create a cell with the following content:

      %%yaml parameters
      
      # Model
      model_name_or_path: Meta-Llama/Meta-Llama-3.1-8B-Instruct
      model_revision: main
      torch_dtype: bfloat16
      attn_implementation: flash_attention_2
      
      # PEFT / LoRA
      use_peft: true
      lora_r: 16
      lora_alpha: 8
      lora_dropout: 0.05
      lora_target_modules: ["q_proj", "v_proj", "k_proj", "o_proj", "gate_proj", "up_proj", "down_proj"]
      lora_modules_to_save: []
      init_lora_weights: true
      
      # Quantization / BitsAndBytes
      load_in_4bit: false                       # use 4 bit precision for the base model (only with LoRA)
      load_in_8bit: false                       # use 8 bit precision for the base model (only with LoRA)
      
      # Datasets
      dataset_name: gsm8k                       # id or path to the dataset
      dataset_config: main                      # name of the dataset configuration
      dataset_train_split: train                # dataset split to use for training
      dataset_test_split: test                  # dataset split to use for evaluation
      dataset_text_field: text                  # name of the text field of the dataset
      dataset_kwargs:
        add_special_tokens: false               # template with special tokens
        append_concat_token: false              # add additional separator token
      
      # SFT
      max_seq_length: 1024                      # max sequence length for model and packing of the dataset
      dataset_batch_size: 1000                  # samples to tokenize per batch
      packing: false
      use_liger: false
      
      # Training
      num_train_epochs: 10                      # number of training epochs
      
      per_device_train_batch_size: 32           # batch size per device during training
      per_device_eval_batch_size: 32            # batch size for evaluation
      auto_find_batch_size: false               # find a batch size that fits into memory automatically
      eval_strategy: epoch                      # evaluate every epoch
      
      bf16: true                                # use bf16 16-bit (mixed) precision
      tf32: false                               # use tf32 precision
      
      learning_rate: 1.0e-4                     # initial learning rate
      warmup_steps: 10                          # steps for a linear warmup from 0 to `learning_rate`
      lr_scheduler_type: inverse_sqrt           # learning rate scheduler (see transformers.SchedulerType)
      
      optim: adamw_torch_fused                  # optimizer (see transformers.OptimizerNames)
      max_grad_norm: 1.0                        # max gradient norm
      seed: 42
      
      gradient_accumulation_steps: 1            # number of steps before performing a backward/update pass
      gradient_checkpointing: false             # use gradient checkpointing to save memory
      gradient_checkpointing_kwargs:
        use_reentrant: false
      
      # FSDP
      fsdp: "full_shard auto_wrap offload"      # remove offload if enough GPU memory
      fsdp_config:
        activation_checkpointing: true
        cpu_ram_efficient_loading: false
        sync_module_states: true
        use_orig_params: true
        limit_all_gathers: false
      
      # Checkpointing
      save_strategy: epoch                      # save checkpoint every epoch
      save_total_limit: 1                       # limit the total amount of checkpoints
      resume_from_checkpoint: false             # load the last checkpoint in output_dir and resume from it
      
      # Logging
      log_level: warning                        # logging level (see transformers.logging)
      logging_strategy: steps
      logging_steps: 1                          # log every N steps
      report_to:
      - tensorboard                             # report metrics to tensorboard
      
      output_dir: /mnt/shared/Meta-Llama-3.1-8B-Instruct
    2. Optional: If you specify a different model or dataset, edit the parameters to suit your model, dataset, and resources. If necessary, update the previous cell to specify the dependencies for your training or tuning job.
    3. Run the cell to set the training parameters.
  5. Create the training function as follows:

    1. Create a cell with the following content:

      def main(parameters):
          import random
      
          from datasets import load_dataset
          from transformers import (
              AutoTokenizer,
              set_seed,
          )
      
          from trl import (
              ModelConfig,
              ScriptArguments,
              SFTConfig,
              SFTTrainer,
              TrlParser,
              get_peft_config,
              get_quantization_config,
              get_kbit_device_map,
          )
      
          parser = TrlParser((ScriptArguments, SFTConfig, ModelConfig))
          script_args, training_args, model_args = parser.parse_dict(parameters)
      
          # Set seed for reproducibility
          set_seed(training_args.seed)
      
          # Model and tokenizer
          quantization_config = get_quantization_config(model_args)
          model_kwargs = dict(
              revision=model_args.model_revision,
              trust_remote_code=model_args.trust_remote_code,
              attn_implementation=model_args.attn_implementation,
              torch_dtype=model_args.torch_dtype,
              use_cache=False if training_args.gradient_checkpointing or
                                 training_args.fsdp_config.get("activation_checkpointing",
                                                               False) else True,
              device_map=get_kbit_device_map() if quantization_config is not None else None,
              quantization_config=quantization_config,
          )
          training_args.model_init_kwargs = model_kwargs
          tokenizer = AutoTokenizer.from_pretrained(
              model_args.model_name_or_path, trust_remote_code=model_args.trust_remote_code, use_fast=True
          )
          if tokenizer.pad_token is None:
              tokenizer.pad_token = tokenizer.eos_token
      
          # You can override the template here according to your use case
          # tokenizer.chat_template = ...
      
          # Datasets
          train_dataset = load_dataset(
              path=script_args.dataset_name,
              name=script_args.dataset_config,
              split=script_args.dataset_train_split,
          )
          test_dataset = None
          if training_args.eval_strategy != "no":
              test_dataset = load_dataset(
                  path=script_args.dataset_name,
                  name=script_args.dataset_config,
                  split=script_args.dataset_test_split,
              )
      
          # Templatize datasets
          def template_dataset(sample):
              # return{"text": tokenizer.apply_chat_template(examples["messages"], tokenize=False)}
              messages = [
                  {"role": "user", "content": sample[question]},
                  {"role": "assistant", "content": sample[answer]},
              ]
              return {"text": tokenizer.apply_chat_template(messages, tokenize=False)}
      
          train_dataset = train_dataset.map(template_dataset, remove_columns=["question", "answer"])
          if training_args.eval_strategy != "no":
              # test_dataset = test_dataset.map(template_dataset, remove_columns=["messages"])
              test_dataset = test_dataset.map(template_dataset, remove_columns=["question", "answer"])
      
          # Check random samples
          with training_args.main_process_first(
              desc="Log few samples from the training set"
          ):
              for index in random.sample(range(len(train_dataset)), 2):
                  print(train_dataset[index]["text"])
      
          # Training
          trainer = SFTTrainer(
              model=model_args.model_name_or_path,
              args=training_args,
              train_dataset=train_dataset,
              eval_dataset=test_dataset,
              peft_config=get_peft_config(model_args),
              tokenizer=tokenizer,
          )
      
          if trainer.accelerator.is_main_process and hasattr(trainer.model, "print_trainable_parameters"):
              trainer.model.print_trainable_parameters()
      
          checkpoint = None
          if training_args.resume_from_checkpoint is not None:
              checkpoint = training_args.resume_from_checkpoint
      
          trainer.train(resume_from_checkpoint=checkpoint)
      
          trainer.save_model(training_args.output_dir)
      
          with training_args.main_process_first(desc="Training completed"):
              print(f"Training completed, model checkpoint written to {training_args.output_dir}")
    2. Optional: If you specify a different model or dataset, edit the tokenizer.chat_template parameter to specify the appropriate value for your model and dataset.
    3. Run the cell to create the training function.
  6. Configure the Training Operator SDK client authentication as follows:

    1. Create a cell with the following content:

      from kubernetes import client
      from kubeflow.training import TrainingClient
      from kubeflow.training.models import V1Volume, V1VolumeMount, V1PersistentVolumeClaimVolumeSource
      
      api_server = "<API_SERVER>"
      token = "<TOKEN>"
      
      configuration = client.Configuration()
      configuration.host = api_server
      configuration.api_key = {"authorization": f"Bearer {token}"}
      # Un-comment if your cluster API server uses a self-signed certificate or an un-trusted CA
      #configuration.verify_ssl = False
      api_client = client.ApiClient(configuration)
      client = TrainingClient(client_configuration=api_client.configuration)
    2. Edit the api_server and token parameters to enter the values to authenticate to your OpenShift cluster.

      For information about how to find the server and token details, see Using the cluster server and token to authenticate.

    3. Run the cell to configure the Training Operator SDK client authentication.
  7. Click File > Save Notebook As, enter an appropriate file name, and click Save.

Verification

  1. All cells run successfully.

4.3.2. Running the fine-tuning job

When you run a training job to tune a model, you must specify the resources needed, and provide any authorization credentials required.

Note

The code in this procedure specifies how to run the example fine-tuning job. If you have the specified resources, you can run the example code without editing it.

Alternatively, you can modify the example code to specify the appropriate details for your fine-tuning job.

Prerequisites

  • You can access an OpenShift cluster that has sufficient worker nodes with supported accelerators to run your training or tuning job.

    The example fine-tuning job requires 8 worker nodes, where each worker node has 64 GiB memory, 4 CPUs, and 1 NVIDIA GPU.

  • You can access a workbench that is suitable for distributed training, as described in Creating a workbench for distributed training.
  • You have administrator access for the project.

    • If you created the project, you automatically have administrator access.
    • If you did not create the project, your cluster administrator must give you administrator access.
  • You have access to a model.
  • You have access to data that you can use to train the model.
  • You have configured the fine-tuning job as described in Configuring the fine-tuning job.
  • You can access a dynamic storage provisioner that supports ReadWriteMany (RWX) Persistent Volume Claim (PVC) provisioning, such as Red Hat OpenShift Data Foundation.
  • A PersistentVolumeClaim resource named shared with RWX access mode is attached to your workbench.
  • You have a Hugging Face account and access token. For more information, search for "user access tokens" in the Hugging Face documentation.

Procedure

  1. Open the workbench, as follows:

    1. Log in to the Red Hat OpenShift AI web console.
    2. Click Projects and click your project.
    3. Click the Workbenches tab. If your workbench is not already running, start the workbench.
    4. Click the Open link to open the IDE in a new window.
  2. Click File Open, and open the Jupyter notebook that you used to configure the fine-tuning job.
  3. Create a cell to run the job, and add the following content:

    client.create_job(
        job_kind="PyTorchJob",
        name="sft",
        train_func=main,
        num_workers=8,
        num_procs_per_worker="1",
        resources_per_worker={
            "nvidia.com/gpu": 1,
            "memory": "64Gi",
            "cpu": 4,
        },
        base_image="quay.io/modh/training:py311-cuda124-torch251",
        env_vars={
            # Hugging Face
            "HF_HOME": "/mnt/shared/.cache",
            "HF_TOKEN": "",
            # CUDA
            "PYTORCH_CUDA_ALLOC_CONF": "expandable_segments:True",
            # NCCL
            "NCCL_DEBUG": "INFO",
            "NCCL_ENABLE_DMABUF_SUPPORT": "1",
        },
        packages_to_install=[
            "tensorboard",
        ],
        parameters=parameters,
        volumes=[
            V1Volume(name="shared",
                persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(claim_name="shared")),
        ],
        volume_mounts=[
            V1VolumeMount(name="shared", mount_path="/mnt/shared"),
        ],
    )
  4. Edit the HF_TOKEN value to specify your Hugging Face access token.

    Optional: If you specify a different model, and your model is not a gated model from the Hugging Face Hub, remove the HF_HOME and HF_TOKEN entries.

  5. Optional: Edit the other content to specify the appropriate values for your environment, as follows:

    1. Edit the num_workers value to specify the number of worker nodes.
    2. Update the resources_per_worker values according to the job requirements and the resources available.
    3. The example provided is for NVIDIA GPUs. If you use AMD accelerators, make the following additional changes:

      • In the resources_per_worker entry, change nvidia.com/gpu to amd.com/gpu
      • Change the base_image value to quay.io/modh/training:py311-rocm62-torch251
      • Remove the CUDA and NCCL entries
    4. If the RWX PersistentVolumeClaim resource that is attached to your workbench has a different name instead of shared, update the following values to replace shared with your PVC name:

      • In this cell, update the HF_HOME value.
      • In this cell, in the volumes entry, update the PVC details:

        • In the V1Volume entry, update the name and claim_name values.
        • In the volume_mounts entry, update the name and mount_path values.
      • In the cell where you set the training parameters, update the output_dir value.

        For more information about setting the training parameters, see Configuring the fine-tuning job.

  6. Run the cell to run the job.

Verification

View the progress of the job as follows:

  1. Create a cell with the following content:

    client.get_job_logs(
        name="sft",
        job_kind="PyTorchJob",
        follow=True,
    )
  2. Run the cell to view the job progress.

4.3.3. Deleting the fine-tuning job

When you no longer need the fine-tuning job, delete the job to release the resources.

Note

The code in this procedure specifies how to delete the example fine-tuning job. If you created the example fine-tuning job named sft, you can run the example code without editing it.

Alternatively, you can modify this example code to specify the name of your fine-tuning job.

Prerequisites

Procedure

  1. Open the workbench, as follows:

    1. Log in to the Red Hat OpenShift AI web console.
    2. Click Projects and click your project.
    3. Click the Workbenches tab. If your workbench is not already running, start the workbench.
    4. Click the Open link to open the IDE in a new window.
  2. Click File Open, and open the Jupyter notebook that you used to configure and run the example fine-tuning job.
  3. Create a cell with the following content:

    client.delete_job(name="sft")
  4. Optional: If you want to delete a different job, edit the content to replace sft with the name of your job.
  5. Run the cell to delete the job.

Verification

  1. In the OpenShift Console, in the Administrator perspective, click Workloads Jobs.
  2. From the Project list, select your project.
  3. Verify that the specified job is not listed.

4.4. Creating a multi-node PyTorch training job with RDMA

NVIDIA GPUDirect RDMA uses Remote Direct Memory Access (RDMA) to provide direct GPU interconnect, enabling peripheral devices to access NVIDIA GPU memory in remote systems directly. RDMA improves the training job performance because it eliminates the overhead of using the operating system CPUs and memory. Running a training job on multiple nodes using multiple GPUs can significantly reduce the completion time.

In Red Hat OpenShift AI, NVIDIA GPUs can communicate directly by using GPUDirect RDMA across the following types of network:

  • Ethernet: RDMA over Converged Ethernet (RoCE)
  • InfiniBand

Before you create a PyTorch training job in a cluster configured for RDMA, you must configure the job to use the high-speed network interfaces.

Prerequisites

Procedure

  1. Log in to the OpenShift Console.
  2. Create a PyTorchJob resource, as follows:

    1. In the Administrator perspective, click Home Search.
    2. From the Project list, select your project.
    3. Click the Resources list, and in the search field, start typing PyTorchJob.
    4. Select PyTorchJob, and click Create PyTorchJob.

      The Create PyTorchJob page opens, with default YAML code automatically added.

  3. Attach the high-speed network interface to the PyTorchJob pods, as follows:

    1. Edit the PyTorchJob resource YAML code to include an annotation that adds the pod to an additional network, as shown in the following example:

      Example annotation to attach network interface to pod

      spec:
        pytorchReplicaSpecs:
          Master:
            replicas: 1
            restartPolicy: OnFailure
            template:
              metadata:
                annotations:
                  k8s.v1.cni.cncf.io/networks: "example-net"

    2. Replace the example network name example-net with the appropriate value for your configuration.
  4. Configure the job to use NVIDIA Collective Communications Library (NCCL) interfaces, as follows:

    1. Edit the PyTorchJob resource YAML code to add the following environment variables:

      Example environment variables

              spec:
                containers:
                - command:
                  - /bin/bash
                  - -c
                  - "your container command"
                  env:
                  - name: NCCL_SOCKET_IFNAME
                    value: "net1"
                  - name: NCCL_IB_HCA
                    value: "mlx5_1"

    2. Replace the example environment-variable values with the appropriate values for your configuration:

      1. Set the *NCCL_SOCKET_IFNAME* environment variable to specify the IP interface to use for communication.
      2. [Optional] To explicitly specify the Host Channel Adapter (HCA) that NCCL should use, set the *NCCL_IB_HCA* environment variable.
  5. Specify the base training image name, as follows:

    1. Edit the PyTorchJob resource YAML code to add the following text:

      Example base training image

      image: quay.io/modh/training:py311-cuda124-torch251

    2. If you want to use a different base training image, replace the image name accordingly.

      For a list of supported training images, see Supported Configurations for 3.x.

  6. Specify the requests and limits for the network interface resources.

    The name of the resource varies, depending on the NVIDIA Network Operator configuration. The resource name might depend on the deployment mode, and is specified in the NicClusterPolicy resource.

    Note

    You must use the resource name that matches your configuration. The name must correspond to the value advertised by the NVIDIA Network Operator on the cluster nodes.

    The following example is for RDMA over Converged Ethernet (RoCE), where the Ethernet RDMA devices are using the RDMA shared device mode.

    1. Review the NicClusterPolicy resource to identify the resourceName value.

      Example NicClusterPolicy

      apiVersion: mellanox.com/v1alpha1
      kind: NicClusterPolicy
      spec:
      rdmaSharedDevicePlugin:
        config: |
          {
            "configList": [
              {
                "resourceName": "rdma_shared_device_eth",
                "rdmaHcaMax": 63,
                "selectors": {
                  "ifNames": ["ens8f0np0"]
                }
              }
            ]
          }

      In this example NicClusterPolicy resource, the resourceName value is rdma_shared_device_eth.

    2. Edit the PyTorchJob resource YAML code to add the following text:

      Example requests and limits for the network interface resources

                  resources:
                    limits:
                      nvidia.com/gpu: "1"
                      rdma/rdma_shared_device_eth: "1"
                    requests:
                      nvidia.com/gpu: "1"
                      rdma/rdma_shared_device_eth: "1"

    3. In the limits and requests sections, replace the resource name with the resource name from your NicClusterPolicy resource (in this example, rdma_shared_device_eth).
    4. Replace the specified value 1 with the number that you require. Ensure that the specified amount is available on your OpenShift cluster.
  7. Repeat the above steps to make the same edits in the Worker section of the PyTorchJob YAML code.
  8. Click Create.

You have created a multi-node PyTorch training job that is configured to run with RDMA.

You can see the entire YAML code for this example PyTorchJob resource in the Example Training Operator PyTorchJob resource configured to run with RDMA.

Verification

  1. In the OpenShift Console, open the Administrator perspective.
  2. From the Project list, select your project.
  3. Click Home Search PyTorchJob and verify that the job was created.
  4. Click Workloads Pods and verify that requested head pod and worker pods are running.

This example shows how to create a Training Operator PyTorch training job that is configured to run with Remote Direct Memory Access (RDMA).

apiVersion: kubeflow.org/v1
kind: PyTorchJob
metadata:
  name: job
spec:
  pytorchReplicaSpecs:
    Master:
      replicas: 1
      restartPolicy: OnFailure
      template:
        metadata:
          annotations:
            k8s.v1.cni.cncf.io/networks: "example-net"
        spec:
          containers:
          - command:
            - /bin/bash
            - -c
            - "your container command"
            env:
            - name: NCCL_SOCKET_IFNAME
              value: "net1"
            - name: NCCL_IB_HCA
              value: "mlx5_1"
            image: quay.io/modh/training:py311-cuda124-torch251
            name: pytorch
            resources:
              limits:
                nvidia.com/gpu: "1"
                rdma/rdma_shared_device_eth: "1"
              requests:
                nvidia.com/gpu: "1"
                rdma/rdma_shared_device_eth: "1"
    Worker:
      replicas: 3
      restartPolicy: OnFailure
      template:
        metadata:
          annotations:
            k8s.v1.cni.cncf.io/networks: "example-net"
        spec:
          containers:
          - command:
            - /bin/bash
            - -c
            - "your container command"
            env:
            - name: NCCL_SOCKET_IFNAME
              value: "net1"
            - name: NCCL_IB_HCA
              value: "mlx5_1"
            image: quay.io/modh/training:py311-cuda124-torch251
            name: pytorch
            resources:
              limits:
                nvidia.com/gpu: "1"
                rdma/rdma_shared_device_eth: "1"
              requests:
                nvidia.com/gpu: "1"
                rdma/rdma_shared_device_eth: "1"
Red Hat logoGithubredditYoutubeTwitter

Aprender

Pruebe, compre y venda

Comunidades

Acerca de la documentación de Red Hat

Ayudamos a los usuarios de Red Hat a innovar y alcanzar sus objetivos con nuestros productos y servicios con contenido en el que pueden confiar. Explore nuestras recientes actualizaciones.

Hacer que el código abierto sea más inclusivo

Red Hat se compromete a reemplazar el lenguaje problemático en nuestro código, documentación y propiedades web. Para más detalles, consulte el Blog de Red Hat.

Acerca de Red Hat

Ofrecemos soluciones reforzadas que facilitan a las empresas trabajar en plataformas y entornos, desde el centro de datos central hasta el perímetro de la red.

Theme

© 2026 Red Hat
Volver arriba