Este contenido no está disponible en el idioma seleccionado.
Chapter 4. Running Training Operator-based distributed training workloads
To reduce the time needed to train a Large Language Model (LLM), you can run the training job in parallel. In Red Hat OpenShift AI, the Kubeflow Training Operator and Kubeflow Training Operator Python Software Development Kit (Training Operator SDK) simplify the job configuration.
You can use the Training Operator and the Training Operator SDK to configure a training job in a variety of ways. For example, you can use multiple nodes and multiple GPUs per node, fine-tune a model, or configure a training job to use Remote Direct Memory Access (RDMA).
4.1. Using the Kubeflow Training Operator to run distributed training workloads Copiar enlaceEnlace copiado en el portapapeles!
You can use the Training Operator PyTorchJob API to configure a PyTorchJob resource so that the training job runs on multiple nodes with multiple GPUs.
You can store the training script in a ConfigMap resource, or include it in a custom container image.
4.1.1. Creating a Training Operator PyTorch training script ConfigMap resource Copiar enlaceEnlace copiado en el portapapeles!
You can create a ConfigMap resource to store the Training Operator PyTorch training script.
Alternatively, you can use the example Dockerfile to include the training script in a custom container image, as described in Creating a custom training image.
Prerequisites
- Your cluster administrator has installed Red Hat OpenShift AI with the required distributed training components as described in Installing the distributed workloads components (for disconnected environments, see Installing the distributed workloads components).
- You can access the OpenShift Console for the cluster where OpenShift AI is installed.
Procedure
- Log in to the OpenShift Console.
Create a
ConfigMapresource, as follows:-
In the Administrator perspective, click Workloads
ConfigMaps. - From the Project list, select your project.
- Click Create ConfigMap.
In the Configure via section, select the YAML view option.
The Create ConfigMap page opens, with default YAML code automatically added.
-
In the Administrator perspective, click Workloads
Replace the default YAML code with your training-script code.
For example training scripts, see Example Training Operator PyTorch training scripts.
- Click Create.
Verification
-
In the OpenShift Console, in the Administrator perspective, click Workloads
ConfigMaps. - From the Project list, select your project.
- Click your ConfigMap resource to display the training script details.
4.1.2. Creating a Training Operator PyTorchJob resource Copiar enlaceEnlace copiado en el portapapeles!
You can create a PyTorchJob resource to run the Training Operator PyTorch training script.
Prerequisites
- You can access an OpenShift cluster that has multiple worker nodes with supported NVIDIA GPUs or AMD GPUs.
Your cluster administrator has configured the cluster as follows:
- Installed Red Hat OpenShift AI with the required distributed training components, as described in Installing the distributed workloads components (for disconnected environments, see Installing the distributed workloads components).
- Configured the distributed training resources, as described in Managing distributed workloads.
- You can access a workbench that is suitable for distributed training, as described in Creating a workbench for distributed training.
You have administrator access for the project.
- If you created the project, you automatically have administrator access.
- If you did not create the project, your cluster administrator must give you administrator access.
Procedure
- Log in to the OpenShift Console.
Create a
PyTorchJobresource, as follows:-
In the Administrator perspective, click Home
Search. - From the Project list, select your project.
-
Click the Resources list, and in the search field, start typing
PyTorchJob. Select PyTorchJob, and click Create PyTorchJob.
The Create PyTorchJob page opens, with default YAML code automatically added.
-
In the Administrator perspective, click Home
Update the metadata to replace the
nameandnamespacevalues with the values for your environment, as shown in the following example:metadata: name: pytorch-multi-node-job namespace: test-namespaceConfigure the master node, as shown in the following example:
spec: pytorchReplicaSpecs: Master: replicas: 1 restartPolicy: OnFailure template: metadata: labels: app: pytorch-multi-node-job-
In the
replicasentry, specify1. Only one master node is needed. To use a ConfigMap resource to provide the training script for the PyTorchJob pods, add the ConfigMap volume mount information, as shown in the following example:
Adding the training script from a ConfigMap resource
Spec: pytorchReplicaSpecs: Master: ... template: spec: containers: - name: pytorch image: quay.io/modh/training:py311-cuda124-torch251 command: ["python", "/workspace/scripts/train.py"] volumeMounts: - name: training-script-volume mountPath: /workspace volumes: - name: training-script-volume configMap: name: training-script-configmapAdd the appropriate resource constraints for your environment, as shown in the following example:
Adding the resource contraints
SSpec: pytorchReplicaSpecs: Master: ... template: spec: containers: ... resources: requests: cpu: "4" memory: "8Gi" nvidia.com/gpu: 2 # To use GPUs (Optional) limits: cpu: "4" memory: "8Gi" nvidia.com/gpu: 2
-
In the
Make similar edits in the
Workersection of thePyTorchJobresource.-
Update the
replicasentry to specify the number of worker nodes.
For a complete example
PyTorchJobresource, see Example Training Operator PyTorchJob resource for multi-node training.-
Update the
- Click Create.
Verification
- In the OpenShift Console, open the Administrator perspective.
- From the Project list, select your project.
-
Click Home
Search PyTorchJob and verify that the job was created. -
Click Workloads
Pods and verify that requested head pod and worker pods are running.
4.1.3. Creating a Training Operator PyTorchJob resource by using the CLI Copiar enlaceEnlace copiado en el portapapeles!
You can use the OpenShift CLI (oc) to create a PyTorchJob resource to run the Training Operator PyTorch training script.
Prerequisites
- You can access an OpenShift cluster that has multiple worker nodes with supported NVIDIA GPUs or AMD GPUs.
Your cluster administrator has configured the cluster as follows:
- Installed Red Hat OpenShift AI with the required distributed training components, as described in Installing the distributed workloads components (for disconnected environments, see Installing the distributed workloads components).
- Configured the distributed training resources, as described in Managing distributed workloads.
- You can access a workbench that is suitable for distributed training, as described in Creating a workbench for distributed training.
You have administrator access for the project.
- If you created the project, you automatically have administrator access.
- If you did not create the project, your cluster administrator must give you administrator access.
You have installed the OpenShift CLI (
oc) as described in the appropriate documentation for your cluster:- Installing the OpenShift CLI for OpenShift Container Platform
- Installing the OpenShift CLI for Red Hat OpenShift Service on AWS
Procedure
Log in to the OpenShift CLI (
oc), as follows:Logging in to the OpenShift CLI (
oc)oc login --token=<token> --server=<server>For information about how to find the server and token details, see Using the cluster server and token to authenticate.
Create a file named
train.pyand populate it with your training script, as follows:Creating the training script
cat <<EOF > train.py <paste your content here> EOFReplace <paste your content here> with your training script content.
For example training scripts, see Example Training Operator PyTorch training scripts.
Create a
ConfigMapresource to store the training script, as follows:Creating the ConfigMap resource
oc create configmap training-script-configmap --from-file=train.py -n <your-namespace>Replace <your-namespace> with the name of your project.
Create a file named
pytorchjob.yamlto define the distributed training job setup, as follows:Defining the distributed training job
cat <<EOF > pytorchjob.py <paste your content here> EOFReplace <paste your content here> with your training job content.
For an example training job, see Example Training Operator PyTorchJob resource for multi-node training.
Create the distributed training job, as follows:
Creating the distributed training job
oc apply -f pytorchjob.yaml
Verification
Monitor the running distributed training job, as follows:
Monitoring the distributed training job
oc get pytorchjobs -n <your-namespace>Replace <your-namespace> with the name of your project.
Check the pod logs, as follows:
Checking the pod logs
oc logs <pod-name> -n <your-namespace>Replace <your-namespace> with the name of your project.
When you want to delete the job, run the following command:
Deleting the job
oc delete pytorchjobs/pytorch-multi-node-job -n <your-namespace>Replace <your-namespace> with the name of your project.
4.1.4. Example Training Operator PyTorch training scripts Copiar enlaceEnlace copiado en el portapapeles!
The following examples show how to configure a PyTorch training script for NVIDIA Collective Communications Library (NCCL), Distributed Data Parallel (DDP), and Fully Sharded Data Parallel (FSDP) training jobs.
If you have the required resources, you can run the example code without editing it.
Alternatively, you can modify the example code to specify the appropriate configuration for your training job.
4.1.4.1. Example Training Operator PyTorch training script: NCCL Copiar enlaceEnlace copiado en el portapapeles!
This NVIDIA Collective Communications Library (NCCL) example returns the rank and tensor value for each accelerator.
import os
import torch
import torch.distributed as dist
def main():
# Select backend dynamically: nccl for GPU, gloo for CPU
backend = "nccl" if torch.cuda.is_available() else "gloo"
# Initialize the process group
dist.init_process_group(backend)
# Get rank and world size
rank = dist.get_rank()
world_size = dist.get_world_size()
# Select device dynamically
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Running on rank {rank} out of {world_size} using {device} with backend {backend}.")
# Initialize tensor on the selected device
tensor = torch.zeros(1, device=device)
if rank == 0:
tensor += 1
for i in range(1, world_size):
dist.send(tensor, dst=i)
else:
dist.recv(tensor, src=0)
print(f"Rank {rank}: Tensor value {tensor.item()} on {device}")
if name == "main":
main()
The backend value is automatically set to one of the following values:
-
nccl: Uses NVIDIA Collective Communications Library (NCCL) for NVIDIA GPUs or ROCm Communication Collectives Library (RCCL) for AMD GPUs -
gloo: Uses Gloo for CPUs
Specify backend="nccl" for both NVIDIA GPUs and AMD GPUs.
For AMD GPUs, even though the backend value is set to nccl, the ROCm environment uses RCCL for communication.
4.1.4.2. Example Training Operator PyTorch training script: DDP Copiar enlaceEnlace copiado en el portapapeles!
This example shows how to configure a training script for a Distributed Data Parallel (DDP) training job.
import os
import sys
import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch import nn, optim
# Enable verbose logging
os.environ["TORCH_DISTRIBUTED_DEBUG"] = "INFO"
def setup_ddp():
"""Initialize the distributed process group dynamically."""
backend = "nccl" if torch.cuda.is_available() else "gloo"
dist.init_process_group(backend=backend)
local_rank = int(os.environ["LOCAL_RANK"])
world_size = dist.get_world_size()
# Ensure correct device is set
device = torch.device(f"cuda:{local_rank}" if torch.cuda.is_available() else "cpu")
torch.cuda.set_device(local_rank) if torch.cuda.is_available() else None
print(f"[Rank {local_rank}] Initialized with backend={backend}, world_size={world_size}")
sys.stdout.flush() # Ensure logs are visible in Kubernetes
return local_rank, world_size, device
def cleanup():
"""Clean up the distributed process group."""
dist.destroy_process_group()
class SimpleModel(nn.Module):
"""A simple model with multiple layers."""
def init(self):
super(SimpleModel, self).init()
self.layer1 = nn.Linear(1024, 512)
self.layer2 = nn.Linear(512, 256)
self.layer3 = nn.Linear(256, 128)
self.layer4 = nn.Linear(128, 64)
self.output = nn.Linear(64, 1)
def forward(self, x):
x = torch.relu(self.layer1(x))
x = torch.relu(self.layer2(x))
x = torch.relu(self.layer3(x))
x = torch.relu(self.layer4(x))
return self.output(x)
def log_ddp_parameters(model, rank):
"""Log model parameter count for DDP."""
num_params = sum(p.numel() for p in model.parameters())
print(f"[Rank {rank}] Model has {num_params} parameters (replicated across all ranks)")
sys.stdout.flush()
def log_memory_usage(rank):
"""Log GPU memory usage if CUDA is available."""
if torch.cuda.is_available():
torch.cuda.synchronize()
print(f"[Rank {rank}] GPU Memory Allocated: {torch.cuda.memory_allocated() / 1e6} MB")
print(f"[Rank {rank}] GPU Memory Reserved: {torch.cuda.memory_reserved() / 1e6} MB")
sys.stdout.flush()
def main():
local_rank, world_size, device = setup_ddp()
# Initialize model and wrap with DDP
model = SimpleModel().to(device)
model = DDP(model, device_ids=[local_rank] if torch.cuda.is_available() else None)
print(f"[Rank {local_rank}] DDP Initialized")
log_ddp_parameters(model, local_rank)
log_memory_usage(local_rank)
# Optimizer and criterion
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.MSELoss()
# Dummy dataset (adjust for real-world use case)
x = torch.randn(32, 1024).to(device)
y = torch.randn(32, 1).to(device)
# Training loop
for epoch in range(5):
model.train()
optimizer.zero_grad()
# Forward pass
outputs = model(x)
loss = criterion(outputs, y)
# Backward pass
loss.backward()
optimizer.step()
print(f"[Rank {local_rank}] Epoch {epoch}, Loss: {loss.item()}")
log_memory_usage(local_rank) # Track memory usage
sys.stdout.flush() # Ensure logs appear in real-time
cleanup()
if name == "main":
main()
4.1.4.3. Example Training Operator PyTorch training script: FSDP Copiar enlaceEnlace copiado en el portapapeles!
This example shows how to configure a training script for a Fully Sharded Data Parallel (FSDP) training job.
import os
import sys
import torch
import torch.distributed as dist
from torch.distributed.fsdp import FullyShardedDataParallel as FSDP, CPUOffload
from torch.distributed.fsdp.wrap import always_wrap_policy
from torch import nn, optim
# Enable verbose logging for debugging
os.environ["TORCH_DISTRIBUTED_DEBUG"] = "INFO" # Enables detailed FSDP logs
def setup_ddp():
"""Initialize the distributed process group dynamically."""
backend = "nccl" if torch.cuda.is_available() else "gloo"
dist.init_process_group(backend=backend)
local_rank = int(os.environ["LOCAL_RANK"])
world_size = dist.get_world_size()
# Ensure the correct device is set
device = torch.device(f"cuda:{local_rank}" if torch.cuda.is_available() else "cpu")
torch.cuda.set_device(local_rank) if torch.cuda.is_available() else None
print(f"[Rank {local_rank}] Initialized with backend={backend}, world_size={world_size}")
sys.stdout.flush() # Ensure logs are visible in Kubernetes
return local_rank, world_size, device
def cleanup():
"""Clean up the distributed process group."""
dist.destroy_process_group()
class SimpleModel(nn.Module):
"""A simple model with multiple layers."""
def init(self):
super(SimpleModel, self).init()
self.layer1 = nn.Linear(1024, 512)
self.layer2 = nn.Linear(512, 256)
self.layer3 = nn.Linear(256, 128)
self.layer4 = nn.Linear(128, 64)
self.output = nn.Linear(64, 1)
def forward(self, x):
x = torch.relu(self.layer1(x))
x = torch.relu(self.layer2(x))
x = torch.relu(self.layer3(x))
x = torch.relu(self.layer4(x))
return self.output(x)
def log_fsdp_parameters(model, rank):
"""Log FSDP parameters and sharding strategy."""
num_params = sum(p.numel() for p in model.parameters())
print(f"[Rank {rank}] Model has {num_params} parameters (sharded across {dist.get_world_size()} workers)")
sys.stdout.flush()
def log_memory_usage(rank):
"""Log GPU memory usage if CUDA is available."""
if torch.cuda.is_available():
torch.cuda.synchronize()
print(f"[Rank {rank}] GPU Memory Allocated: {torch.cuda.memory_allocated() / 1e6} MB")
print(f"[Rank {rank}] GPU Memory Reserved: {torch.cuda.memory_reserved() / 1e6} MB")
sys.stdout.flush()
def main():
local_rank, world_size, device = setup_ddp()
# Initialize model and wrap with FSDP
model = SimpleModel().to(device)
model = FSDP(
model,
cpu_offload=CPUOffload(offload_params=not torch.cuda.is_available()), # Offload if no GPU
auto_wrap_policy=always_wrap_policy, # Wrap all layers automatically
)
print(f"[Rank {local_rank}] FSDP Initialized")
log_fsdp_parameters(model, local_rank)
log_memory_usage(local_rank)
# Optimizer and criterion
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.MSELoss()
# Dummy dataset (adjust for real-world use case)
x = torch.randn(32, 1024).to(device)
y = torch.randn(32, 1).to(device)
# Training loop
for epoch in range(5):
model.train()
optimizer.zero_grad()
# Forward pass
outputs = model(x)
loss = criterion(outputs, y)
# Backward pass
loss.backward()
optimizer.step()
print(f"[Rank {local_rank}] Epoch {epoch}, Loss: {loss.item()}")
log_memory_usage(local_rank) # Track memory usage
sys.stdout.flush() # Ensure logs appear in real-time
cleanup()
if name == "main":
main()
4.1.5. Example Dockerfile for a Training Operator PyTorch training script Copiar enlaceEnlace copiado en el portapapeles!
You can use this example Dockerfile to include the training script in a custom training image.
FROM quay.io/modh/training:py311-cuda124-torch251
WORKDIR /workspace
COPY train.py /workspace/train.py
CMD ["python", "train.py"]
This example copies the training script to the default PyTorch image, and runs the script.
For more information about how to use this Dockerfile to include the training script in a custom container image, see Creating a custom training image.
4.1.6. Example Training Operator PyTorchJob resource for multi-node training Copiar enlaceEnlace copiado en el portapapeles!
This example shows how to create a Training Operator PyTorch training job that runs on multiple nodes with multiple GPUs.
apiVersion: kubeflow.org/v1
kind: PyTorchJob
metadata:
name: pytorch-multi-node-job
namespace: test-namespace
spec:
pytorchReplicaSpecs:
Master:
replicas: 1
restartPolicy: OnFailure
template:
metadata:
labels:
app: pytorch-multi-node-job
spec:
containers:
- name: pytorch
image: quay.io/modh/training:py311-cuda124-torch251
imagePullPolicy: IfNotPresent
command: ["torchrun", "/workspace/train.py"]
volumeMounts:
- name: training-script-volume
mountPath: /workspace
resources:
requests:
cpu: "4"
memory: "8Gi"
nvidia.com/gpu: "2"
limits:
cpu: "4"
memory: "8Gi"
nvidia.com/gpu: "2"
volumes:
- name: training-script-volume
configMap:
name: training-script-configmap
Worker:
replicas: 1
restartPolicy: OnFailure
template:
metadata:
labels:
app: pytorch-multi-node-job
spec:
containers:
- name: pytorch
image: quay.io/modh/training:py311-cuda124-torch251
imagePullPolicy: IfNotPresent
command: ["torchrun", "/workspace/train.py"]
volumeMounts:
- name: training-script-volume
mountPath: /workspace
resources:
requests:
cpu: "4"
memory: "8Gi"
nvidia.com/gpu: "2"
limits:
cpu: "4"
memory: "8Gi"
nvidia.com/gpu: "2"
volumes:
- name: training-script-volume
configMap:
name: training-script-configmap
4.2. Using the Training Operator SDK to run distributed training workloads Copiar enlaceEnlace copiado en el portapapeles!
You can use the Training Operator SDK to configure a distributed training job to run on multiple nodes with multiple accelerators per node.
You can configure the PyTorchJob resource so that the training job runs on multiple nodes with multiple GPUs.
4.2.1. Configuring a training job by using the Training Operator SDK Copiar enlaceEnlace copiado en el portapapeles!
Before you can run a job to train a model, you must configure the training job. You must set the training parameters, define the training function, and configure the Training Operator SDK.
The code in this procedure specifies how to configure an example training job. If you have the specified resources, you can run the example code without editing it.
Alternatively, you can modify the example code to specify the appropriate configuration for your training job.
Prerequisites
- You can access an OpenShift cluster that has sufficient worker nodes with supported accelerators to run your training or tuning job.
- You can access a workbench that is suitable for distributed training, as described in Creating a workbench for distributed training.
You have administrator access for the project.
- If you created the project, you automatically have administrator access.
- If you did not create the project, your cluster administrator must give you administrator access.
Procedure
Open the workbench, as follows:
- Log in to the Red Hat OpenShift AI web console.
- Click Projects and click your project.
- Click the Workbenches tab.
- If your workbench is not already running, start the workbench.
- Click the Open link to open the IDE in a new window.
-
Click File
New Notebook. Create the training function as shown in the following example:
Create a cell with the following content:
Example training function
def train_func(): import os import torch import torch.distributed as dist # Select backend dynamically: nccl for GPU, gloo for CPU backend = "nccl" if torch.cuda.is_available() else "gloo" # Initialize the process group dist.init_process_group(backend) # Get rank and world size rank = dist.get_rank() world_size = dist.get_world_size() # Select device dynamically device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # Log rank initialization print(f"Rank {rank}/{world_size} initialized with backend {backend} on device {device}.") # Initialize tensor on the selected device tensor = torch.zeros(1, device=device) if rank == 0: tensor += 1 for i in range(1, world_size): dist.send(tensor, dst=i) else: dist.recv(tensor, src=0) print(f"Rank {rank}: Tensor value {tensor.item()} on {device}") # Cleanup dist.destroy_process_group()NoteFor this example training job, you do not need to install any additional packages or set any training parameters.
For more information about how to add additional packages and set the training parameters, see Configuring the fine-tuning job.
- Optional: Edit the content to specify the appropriate values for your environment.
- Run the cell to create the training function.
Configure the Training Operator SDK client authentication as follows:
Create a cell with the following content:
Example Training Operator SDK client authentication
from kubernetes import client from kubeflow.training import TrainingClient from kubeflow.training.models import V1Volume, V1VolumeMount, V1PersistentVolumeClaimVolumeSource api_server = "<API_SERVER>" token = "<TOKEN>" configuration = client.Configuration() configuration.host = api_server configuration.api_key = {"authorization": f"Bearer {token}"} # Un-comment if your cluster API server uses a self-signed certificate or an un-trusted CA #configuration.verify_ssl = False api_client = client.ApiClient(configuration) client = TrainingClient(client_configuration=api_client.configuration)Edit the
api_serverandtokenparameters to enter the values to authenticate to your OpenShift cluster.For information on how to find the server and token details, see Using the cluster server and token to authenticate.
- Run the cell to configure the Training Operator SDK client authentication.
- Click File > Save Notebook As, enter an appropriate file name, and click Save.
Verification
- All cells run successfully.
4.2.2. Running a training job by using the Training Operator SDK Copiar enlaceEnlace copiado en el portapapeles!
When you run a training job to tune a model, you must specify the resources needed, and provide any authorization credentials required.
The code in this procedure specifies how to run the example training job. If you have the specified resources, you can run the example code without editing it.
Alternatively, you can modify the example code to specify the appropriate details for your training job.
Prerequisites
- You can access an OpenShift cluster that has sufficient worker nodes with supported accelerators to run your training or tuning job.
- You can access a workbench that is suitable for distributed training, as described in Creating a workbench for distributed training.
You have administrator access for the project.
- If you created the project, you automatically have administrator access.
- If you did not create the project, your cluster administrator must give you administrator access.
-
You have enabled your project for Kueue management by applying the
kueue.openshift.io/managed=truelabel to the project namespace. - You have created resource flavor, cluster queue, and local queue Kueue objects for your project. For more information about creating these objects, see Configuring quota management for distributed workloads.
- You have access to a model.
- You have access to data that you can use to train the model.
- You have configured the training job as described in Configuring a training job by using the Training Operator SDK.
Procedure
Open the workbench, as follows:
- Log in to the Red Hat OpenShift AI web console.
- Click Projects and click your project.
- Click the Workbenches tab. If your workbench is not already running, start the workbench.
- Click the Open link to open the IDE in a new window.
-
Click File
Open, and open the Jupyter notebook that you used to configure the training job. Create a cell to run the job, and add the following content:
from kubernetes import client # Start PyTorchJob with 2 Workers and 2 GPU per Worker (multi-node, multi-worker job). client.create_job( name="pytorch-ddp", train_func=train_func, base_image="quay.io/modh/training:py311-cuda124-torch251", num_workers=2, resources_per_worker={"nvidia.com/gpu": "2"}, packages_to_install=["torchvision==0.19.0"], env_vars={"NCCL_DEBUG": "INFO", "TORCH_DISTRIBUTED_DEBUG": "DETAIL"}, labels={ "kueue.x-k8s.io/queue-name": "<local-queue-name>", "key": "value" }, annotations={"key": "value"} )Edit the content to specify the appropriate values for your environment, as follows:
-
Edit the
num_workersvalue to specify the number of worker nodes. -
Update the
resources_per_workervalues according to the job requirements and the resources available. -
Edit the value of the
kueue.x-k8s.io/queue-namelabel to match the name of your targetLocalQueue. The example provided is for NVIDIA GPUs. If you use AMD accelerators, make the following additional changes:
-
In the
resources_per_workerentry, changenvidia.com/gputoamd.com/gpu -
Change the
base_imagevalue toquay.io/modh/training:py311-rocm62-torch251 -
Remove the
NCCL_DEBUGentry
-
In the
-
Edit the
If the job_kind value is not explicitly set, the TrainingClient API automatically sets the job_kind value to PyTorchJob.
- Run the cell to run the job.
Verification
View the progress of the job as follows:
Create a cell with the following content:
client.get_job_logs( name="pytorch-ddp", job_kind="PyTorchJob", follow=True, )- Run the cell to view the job progress.
4.3. Fine-tuning a model by using Kubeflow Training Copiar enlaceEnlace copiado en el portapapeles!
Supervised fine-tuning (SFT) is the process of customizing a Large Language Model (LLM) for a specific task by using labelled data. In this example, you use the Kubeflow Training Operator and Kubeflow Training Operator Python Software Development Kit (Training Operator SDK) to supervise fine-tune an LLM in Red Hat OpenShift AI, by using the Hugging Face SFT Trainer.
Optionally, you can use Low-Rank Adaptation (LoRA) to efficiently fine-tune large language models. LORA optimizes computational requirements and reduces memory footprint, enabling you to fine-tune on consumer-grade GPUs. With SFT, you can combine PyTorch Fully Sharded Data Parallel (FSDP) and LoRA to enable scalable, cost-effective model training and inference, enhancing the flexibility and performance of AI workloads within OpenShift environments.
4.3.1. Configuring the fine-tuning job Copiar enlaceEnlace copiado en el portapapeles!
Before you can use a training job to fine-tune a model, you must configure the training job. You must set the training parameters, define the training function, and configure the Training Operator SDK.
The code in this procedure specifies how to configure an example fine-tuning job. If you have the specified resources, you can run the example code without editing it.
Alternatively, you can modify the example code to specify the appropriate configuration for your fine-tuning job.
Prerequisites
You can access an OpenShift cluster that has sufficient worker nodes with supported accelerators to run your training or tuning job.
The example fine-tuning job requires 8 worker nodes, where each worker node has 64 GiB memory, 4 CPUs, and 1 NVIDIA GPU.
- You can access a workbench that is suitable for distributed training, as described in Creating a workbench for distributed training.
- You can access a dynamic storage provisioner that supports ReadWriteMany (RWX) Persistent Volume Claim (PVC) provisioning, such as Red Hat OpenShift Data Foundation.
You have administrator access for the project.
- If you created the project, you automatically have administrator access.
- If you did not create the project, your cluster administrator must give you administrator access.
Procedure
Open the workbench, as follows:
- Log in to the Red Hat OpenShift AI web console.
- Click Projects and click your project.
- Click the Workbenches tab.
- Ensure that the workbench uses a storage class with RWX capability.
- If your workbench is not already running, start the workbench.
- Click the Open link to open the IDE in a new window.
-
Click File
New Notebook. Install any additional packages that are needed to run the training or tuning job.
In a notebook cell, add the code to install the additional packages, as follows:
Code to install dependencies
# Install the yamlmagic package !pip install yamlmagic %load_ext yamlmagic !pip install git+https://github.com/kubeflow/trainer.git@release-1.9#subdirectory=sdk/pythonSelect the cell, and click Run > Run selected cell.
The additional packages are installed.
Set the training parameters as follows:
Create a cell with the following content:
%%yaml parameters # Model model_name_or_path: Meta-Llama/Meta-Llama-3.1-8B-Instruct model_revision: main torch_dtype: bfloat16 attn_implementation: flash_attention_2 # PEFT / LoRA use_peft: true lora_r: 16 lora_alpha: 8 lora_dropout: 0.05 lora_target_modules: ["q_proj", "v_proj", "k_proj", "o_proj", "gate_proj", "up_proj", "down_proj"] lora_modules_to_save: [] init_lora_weights: true # Quantization / BitsAndBytes load_in_4bit: false # use 4 bit precision for the base model (only with LoRA) load_in_8bit: false # use 8 bit precision for the base model (only with LoRA) # Datasets dataset_name: gsm8k # id or path to the dataset dataset_config: main # name of the dataset configuration dataset_train_split: train # dataset split to use for training dataset_test_split: test # dataset split to use for evaluation dataset_text_field: text # name of the text field of the dataset dataset_kwargs: add_special_tokens: false # template with special tokens append_concat_token: false # add additional separator token # SFT max_seq_length: 1024 # max sequence length for model and packing of the dataset dataset_batch_size: 1000 # samples to tokenize per batch packing: false use_liger: false # Training num_train_epochs: 10 # number of training epochs per_device_train_batch_size: 32 # batch size per device during training per_device_eval_batch_size: 32 # batch size for evaluation auto_find_batch_size: false # find a batch size that fits into memory automatically eval_strategy: epoch # evaluate every epoch bf16: true # use bf16 16-bit (mixed) precision tf32: false # use tf32 precision learning_rate: 1.0e-4 # initial learning rate warmup_steps: 10 # steps for a linear warmup from 0 to `learning_rate` lr_scheduler_type: inverse_sqrt # learning rate scheduler (see transformers.SchedulerType) optim: adamw_torch_fused # optimizer (see transformers.OptimizerNames) max_grad_norm: 1.0 # max gradient norm seed: 42 gradient_accumulation_steps: 1 # number of steps before performing a backward/update pass gradient_checkpointing: false # use gradient checkpointing to save memory gradient_checkpointing_kwargs: use_reentrant: false # FSDP fsdp: "full_shard auto_wrap offload" # remove offload if enough GPU memory fsdp_config: activation_checkpointing: true cpu_ram_efficient_loading: false sync_module_states: true use_orig_params: true limit_all_gathers: false # Checkpointing save_strategy: epoch # save checkpoint every epoch save_total_limit: 1 # limit the total amount of checkpoints resume_from_checkpoint: false # load the last checkpoint in output_dir and resume from it # Logging log_level: warning # logging level (see transformers.logging) logging_strategy: steps logging_steps: 1 # log every N steps report_to: - tensorboard # report metrics to tensorboard output_dir: /mnt/shared/Meta-Llama-3.1-8B-Instruct- Optional: If you specify a different model or dataset, edit the parameters to suit your model, dataset, and resources. If necessary, update the previous cell to specify the dependencies for your training or tuning job.
- Run the cell to set the training parameters.
Create the training function as follows:
Create a cell with the following content:
def main(parameters): import random from datasets import load_dataset from transformers import ( AutoTokenizer, set_seed, ) from trl import ( ModelConfig, ScriptArguments, SFTConfig, SFTTrainer, TrlParser, get_peft_config, get_quantization_config, get_kbit_device_map, ) parser = TrlParser((ScriptArguments, SFTConfig, ModelConfig)) script_args, training_args, model_args = parser.parse_dict(parameters) # Set seed for reproducibility set_seed(training_args.seed) # Model and tokenizer quantization_config = get_quantization_config(model_args) model_kwargs = dict( revision=model_args.model_revision, trust_remote_code=model_args.trust_remote_code, attn_implementation=model_args.attn_implementation, torch_dtype=model_args.torch_dtype, use_cache=False if training_args.gradient_checkpointing or training_args.fsdp_config.get("activation_checkpointing", False) else True, device_map=get_kbit_device_map() if quantization_config is not None else None, quantization_config=quantization_config, ) training_args.model_init_kwargs = model_kwargs tokenizer = AutoTokenizer.from_pretrained( model_args.model_name_or_path, trust_remote_code=model_args.trust_remote_code, use_fast=True ) if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token # You can override the template here according to your use case # tokenizer.chat_template = ... # Datasets train_dataset = load_dataset( path=script_args.dataset_name, name=script_args.dataset_config, split=script_args.dataset_train_split, ) test_dataset = None if training_args.eval_strategy != "no": test_dataset = load_dataset( path=script_args.dataset_name, name=script_args.dataset_config, split=script_args.dataset_test_split, ) # Templatize datasets def template_dataset(sample): # return{"text": tokenizer.apply_chat_template(examples["messages"], tokenize=False)} messages = [ {"role": "user", "content": sample[question]}, {"role": "assistant", "content": sample[answer]}, ] return {"text": tokenizer.apply_chat_template(messages, tokenize=False)} train_dataset = train_dataset.map(template_dataset, remove_columns=["question", "answer"]) if training_args.eval_strategy != "no": # test_dataset = test_dataset.map(template_dataset, remove_columns=["messages"]) test_dataset = test_dataset.map(template_dataset, remove_columns=["question", "answer"]) # Check random samples with training_args.main_process_first( desc="Log few samples from the training set" ): for index in random.sample(range(len(train_dataset)), 2): print(train_dataset[index]["text"]) # Training trainer = SFTTrainer( model=model_args.model_name_or_path, args=training_args, train_dataset=train_dataset, eval_dataset=test_dataset, peft_config=get_peft_config(model_args), tokenizer=tokenizer, ) if trainer.accelerator.is_main_process and hasattr(trainer.model, "print_trainable_parameters"): trainer.model.print_trainable_parameters() checkpoint = None if training_args.resume_from_checkpoint is not None: checkpoint = training_args.resume_from_checkpoint trainer.train(resume_from_checkpoint=checkpoint) trainer.save_model(training_args.output_dir) with training_args.main_process_first(desc="Training completed"): print(f"Training completed, model checkpoint written to {training_args.output_dir}")-
Optional: If you specify a different model or dataset, edit the
tokenizer.chat_templateparameter to specify the appropriate value for your model and dataset. - Run the cell to create the training function.
Configure the Training Operator SDK client authentication as follows:
Create a cell with the following content:
from kubernetes import client from kubeflow.training import TrainingClient from kubeflow.training.models import V1Volume, V1VolumeMount, V1PersistentVolumeClaimVolumeSource api_server = "<API_SERVER>" token = "<TOKEN>" configuration = client.Configuration() configuration.host = api_server configuration.api_key = {"authorization": f"Bearer {token}"} # Un-comment if your cluster API server uses a self-signed certificate or an un-trusted CA #configuration.verify_ssl = False api_client = client.ApiClient(configuration) client = TrainingClient(client_configuration=api_client.configuration)Edit the
api_serverandtokenparameters to enter the values to authenticate to your OpenShift cluster.For information about how to find the server and token details, see Using the cluster server and token to authenticate.
- Run the cell to configure the Training Operator SDK client authentication.
- Click File > Save Notebook As, enter an appropriate file name, and click Save.
Verification
- All cells run successfully.
4.3.2. Running the fine-tuning job Copiar enlaceEnlace copiado en el portapapeles!
When you run a training job to tune a model, you must specify the resources needed, and provide any authorization credentials required.
The code in this procedure specifies how to run the example fine-tuning job. If you have the specified resources, you can run the example code without editing it.
Alternatively, you can modify the example code to specify the appropriate details for your fine-tuning job.
Prerequisites
You can access an OpenShift cluster that has sufficient worker nodes with supported accelerators to run your training or tuning job.
The example fine-tuning job requires 8 worker nodes, where each worker node has 64 GiB memory, 4 CPUs, and 1 NVIDIA GPU.
- You can access a workbench that is suitable for distributed training, as described in Creating a workbench for distributed training.
You have administrator access for the project.
- If you created the project, you automatically have administrator access.
- If you did not create the project, your cluster administrator must give you administrator access.
- You have access to a model.
- You have access to data that you can use to train the model.
- You have configured the fine-tuning job as described in Configuring the fine-tuning job.
- You can access a dynamic storage provisioner that supports ReadWriteMany (RWX) Persistent Volume Claim (PVC) provisioning, such as Red Hat OpenShift Data Foundation.
-
A
PersistentVolumeClaimresource namedsharedwith RWX access mode is attached to your workbench. - You have a Hugging Face account and access token. For more information, search for "user access tokens" in the Hugging Face documentation.
Procedure
Open the workbench, as follows:
- Log in to the Red Hat OpenShift AI web console.
- Click Projects and click your project.
- Click the Workbenches tab. If your workbench is not already running, start the workbench.
- Click the Open link to open the IDE in a new window.
-
Click File
Open, and open the Jupyter notebook that you used to configure the fine-tuning job. Create a cell to run the job, and add the following content:
client.create_job( job_kind="PyTorchJob", name="sft", train_func=main, num_workers=8, num_procs_per_worker="1", resources_per_worker={ "nvidia.com/gpu": 1, "memory": "64Gi", "cpu": 4, }, base_image="quay.io/modh/training:py311-cuda124-torch251", env_vars={ # Hugging Face "HF_HOME": "/mnt/shared/.cache", "HF_TOKEN": "", # CUDA "PYTORCH_CUDA_ALLOC_CONF": "expandable_segments:True", # NCCL "NCCL_DEBUG": "INFO", "NCCL_ENABLE_DMABUF_SUPPORT": "1", }, packages_to_install=[ "tensorboard", ], parameters=parameters, volumes=[ V1Volume(name="shared", persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(claim_name="shared")), ], volume_mounts=[ V1VolumeMount(name="shared", mount_path="/mnt/shared"), ], )Edit the
HF_TOKENvalue to specify your Hugging Face access token.Optional: If you specify a different model, and your model is not a gated model from the Hugging Face Hub, remove the
HF_HOMEandHF_TOKENentries.Optional: Edit the other content to specify the appropriate values for your environment, as follows:
-
Edit the
num_workersvalue to specify the number of worker nodes. -
Update the
resources_per_workervalues according to the job requirements and the resources available. The example provided is for NVIDIA GPUs. If you use AMD accelerators, make the following additional changes:
-
In the
resources_per_workerentry, changenvidia.com/gputoamd.com/gpu -
Change the
base_imagevalue toquay.io/modh/training:py311-rocm62-torch251 -
Remove the
CUDAandNCCLentries
-
In the
If the RWX
PersistentVolumeClaimresource that is attached to your workbench has a different name instead ofshared, update the following values to replacesharedwith your PVC name:-
In this cell, update the
HF_HOMEvalue. In this cell, in the
volumesentry, update the PVC details:-
In the
V1Volumeentry, update thenameandclaim_namevalues. -
In the
volume_mountsentry, update thenameandmount_pathvalues.
-
In the
In the cell where you set the training parameters, update the
output_dirvalue.For more information about setting the training parameters, see Configuring the fine-tuning job.
-
In this cell, update the
-
Edit the
- Run the cell to run the job.
Verification
View the progress of the job as follows:
Create a cell with the following content:
client.get_job_logs( name="sft", job_kind="PyTorchJob", follow=True, )- Run the cell to view the job progress.
4.3.3. Deleting the fine-tuning job Copiar enlaceEnlace copiado en el portapapeles!
When you no longer need the fine-tuning job, delete the job to release the resources.
The code in this procedure specifies how to delete the example fine-tuning job. If you created the example fine-tuning job named sft, you can run the example code without editing it.
Alternatively, you can modify this example code to specify the name of your fine-tuning job.
Prerequisites
- You have created a fine-tuning job as described in Running the fine-tuning job.
Procedure
Open the workbench, as follows:
- Log in to the Red Hat OpenShift AI web console.
- Click Projects and click your project.
- Click the Workbenches tab. If your workbench is not already running, start the workbench.
- Click the Open link to open the IDE in a new window.
-
Click File
Open, and open the Jupyter notebook that you used to configure and run the example fine-tuning job. Create a cell with the following content:
client.delete_job(name="sft")-
Optional: If you want to delete a different job, edit the content to replace
sftwith the name of your job. - Run the cell to delete the job.
Verification
-
In the OpenShift Console, in the Administrator perspective, click Workloads
Jobs. - From the Project list, select your project.
- Verify that the specified job is not listed.
4.4. Creating a multi-node PyTorch training job with RDMA Copiar enlaceEnlace copiado en el portapapeles!
NVIDIA GPUDirect RDMA uses Remote Direct Memory Access (RDMA) to provide direct GPU interconnect, enabling peripheral devices to access NVIDIA GPU memory in remote systems directly. RDMA improves the training job performance because it eliminates the overhead of using the operating system CPUs and memory. Running a training job on multiple nodes using multiple GPUs can significantly reduce the completion time.
In Red Hat OpenShift AI, NVIDIA GPUs can communicate directly by using GPUDirect RDMA across the following types of network:
- Ethernet: RDMA over Converged Ethernet (RoCE)
- InfiniBand
Before you create a PyTorch training job in a cluster configured for RDMA, you must configure the job to use the high-speed network interfaces.
Prerequisites
- You can access an OpenShift cluster that has multiple worker nodes with supported NVIDIA GPUs.
Your cluster administrator has configured the cluster as follows:
- Installed Red Hat OpenShift AI with the required distributed training components, as described in Installing the distributed workloads components (for disconnected environments, see Installing the distributed workloads components).
- Configured the distributed training resources, as described in Managing distributed workloads.
- Configured the cluster for RDMA, as described in Configuring a cluster for RDMA.
Procedure
- Log in to the OpenShift Console.
Create a
PyTorchJobresource, as follows:-
In the Administrator perspective, click Home
Search. - From the Project list, select your project.
-
Click the Resources list, and in the search field, start typing
PyTorchJob. Select PyTorchJob, and click Create PyTorchJob.
The Create PyTorchJob page opens, with default YAML code automatically added.
-
In the Administrator perspective, click Home
Attach the high-speed network interface to the
PyTorchJobpods, as follows:Edit the
PyTorchJobresource YAML code to include an annotation that adds the pod to an additional network, as shown in the following example:Example annotation to attach network interface to pod
spec: pytorchReplicaSpecs: Master: replicas: 1 restartPolicy: OnFailure template: metadata: annotations: k8s.v1.cni.cncf.io/networks: "example-net"-
Replace the example network name
example-netwith the appropriate value for your configuration.
Configure the job to use NVIDIA Collective Communications Library (NCCL) interfaces, as follows:
Edit the
PyTorchJobresource YAML code to add the following environment variables:Example environment variables
spec: containers: - command: - /bin/bash - -c - "your container command" env: - name: NCCL_SOCKET_IFNAME value: "net1" - name: NCCL_IB_HCA value: "mlx5_1"Replace the example environment-variable values with the appropriate values for your configuration:
-
Set the
*NCCL_SOCKET_IFNAME*environment variable to specify the IP interface to use for communication. -
[Optional] To explicitly specify the Host Channel Adapter (HCA) that NCCL should use, set the
*NCCL_IB_HCA*environment variable.
-
Set the
Specify the base training image name, as follows:
Edit the
PyTorchJobresource YAML code to add the following text:Example base training image
image: quay.io/modh/training:py311-cuda124-torch251If you want to use a different base training image, replace the image name accordingly.
For a list of supported training images, see Supported Configurations for 3.x.
Specify the requests and limits for the network interface resources.
The name of the resource varies, depending on the NVIDIA Network Operator configuration. The resource name might depend on the deployment mode, and is specified in the
NicClusterPolicyresource.NoteYou must use the resource name that matches your configuration. The name must correspond to the value advertised by the NVIDIA Network Operator on the cluster nodes.
The following example is for RDMA over Converged Ethernet (RoCE), where the Ethernet RDMA devices are using the RDMA shared device mode.
Review the
NicClusterPolicyresource to identify theresourceNamevalue.Example NicClusterPolicy
apiVersion: mellanox.com/v1alpha1 kind: NicClusterPolicy spec: rdmaSharedDevicePlugin: config: | { "configList": [ { "resourceName": "rdma_shared_device_eth", "rdmaHcaMax": 63, "selectors": { "ifNames": ["ens8f0np0"] } } ] }In this example
NicClusterPolicyresource, theresourceNamevalue isrdma_shared_device_eth.Edit the
PyTorchJobresource YAML code to add the following text:Example requests and limits for the network interface resources
resources: limits: nvidia.com/gpu: "1" rdma/rdma_shared_device_eth: "1" requests: nvidia.com/gpu: "1" rdma/rdma_shared_device_eth: "1"-
In the
limitsandrequestssections, replace the resource name with the resource name from yourNicClusterPolicyresource (in this example,rdma_shared_device_eth). -
Replace the specified value
1with the number that you require. Ensure that the specified amount is available on your OpenShift cluster.
-
Repeat the above steps to make the same edits in the
Workersection of thePyTorchJobYAML code. - Click Create.
You have created a multi-node PyTorch training job that is configured to run with RDMA.
You can see the entire YAML code for this example PyTorchJob resource in the Example Training Operator PyTorchJob resource configured to run with RDMA.
Verification
- In the OpenShift Console, open the Administrator perspective.
- From the Project list, select your project.
-
Click Home
Search PyTorchJob and verify that the job was created. -
Click Workloads
Pods and verify that requested head pod and worker pods are running.
4.5. Example Training Operator PyTorchJob resource configured to run with RDMA Copiar enlaceEnlace copiado en el portapapeles!
This example shows how to create a Training Operator PyTorch training job that is configured to run with Remote Direct Memory Access (RDMA).
apiVersion: kubeflow.org/v1
kind: PyTorchJob
metadata:
name: job
spec:
pytorchReplicaSpecs:
Master:
replicas: 1
restartPolicy: OnFailure
template:
metadata:
annotations:
k8s.v1.cni.cncf.io/networks: "example-net"
spec:
containers:
- command:
- /bin/bash
- -c
- "your container command"
env:
- name: NCCL_SOCKET_IFNAME
value: "net1"
- name: NCCL_IB_HCA
value: "mlx5_1"
image: quay.io/modh/training:py311-cuda124-torch251
name: pytorch
resources:
limits:
nvidia.com/gpu: "1"
rdma/rdma_shared_device_eth: "1"
requests:
nvidia.com/gpu: "1"
rdma/rdma_shared_device_eth: "1"
Worker:
replicas: 3
restartPolicy: OnFailure
template:
metadata:
annotations:
k8s.v1.cni.cncf.io/networks: "example-net"
spec:
containers:
- command:
- /bin/bash
- -c
- "your container command"
env:
- name: NCCL_SOCKET_IFNAME
value: "net1"
- name: NCCL_IB_HCA
value: "mlx5_1"
image: quay.io/modh/training:py311-cuda124-torch251
name: pytorch
resources:
limits:
nvidia.com/gpu: "1"
rdma/rdma_shared_device_eth: "1"
requests:
nvidia.com/gpu: "1"
rdma/rdma_shared_device_eth: "1"