Skip to content

PyTorch on Rocket

Introduction

PyTorch is an open source Python package that provides tensor computation, like numpy, with GPU acceleration and deep neural networks.

PyTorch can be installed with pip like so:

module load py-pip/23.0
pip install torch torchvision torchaudio python-hostlist

Python-hostlist here is necessary to facilitate communication between ranks when running multiple processes on multiple GPUs.

Container Installation

One of ways to get PyTorch to run is to use containers. On HPC clusters we use Singularity for this, since it doesn't require root privileges. Singularity can be loaded as a module.

module load singularity
Using Singularity we can then pull Docker containers and it will automatically translate them into a Singularity format. For this we are going to use NVIDIA official containers that include MPI and python-hostlist, which are both needed to run distributed code. This means we don't have to install them on top of the container.
We also set the environment variable $SINGULARITY_TMPDIR and $SINGULARITY_CACHEDIR to direct unneeded data to a familiar place, which we can get rid of later.

mkdir tmp cache
export SINGULARITY_CACHEDIR=$PWD/cache
export SINGULARITY_TMPDIR=$PWD/tmp
singularity pull docker://nvcr.io/nvidia/pytorch:23.08-py3
rm -rf tmp cache

Code examples

There are many ways to distribute training in PyTorch, which you can read about here. One way to not do this is DataParallel, because it uses threading and threading has really poor performance due to the underlying mechanics of Python. So if we want to use multiple GPUs efficiently we will have to use multiple tasks. Below is code that will set up the environment, so that these different tasks that are started can communicate with each other. It essentially sets up a communicator port and establishes the rank of each process.

pt_distr_env.py

import os
import hostlist


class DistributedEnviron():
    def __init__(self):
        self._setup_distr_env()
        self.master_addr = os.environ['MASTER_ADDR']
        self.master_port = os.environ['MASTER_PORT']
        self.world_size = int(os.environ['WORLD_SIZE'])
        self.rank = int(os.environ['RANK'])
        self.local_rank = int(os.environ['LOCAL_RANK'])

    def _setup_distr_env(self):
        hostnames = hostlist.expand_hostlist(os.environ['SLURM_JOB_NODELIST'])
        os.environ['MASTER_ADDR'] = hostnames[0]
        os.environ['MASTER_PORT'] = '39591'
        os.environ['WORLD_SIZE'] = os.environ['SLURM_NTASKS']
        os.environ['RANK'] = os.environ['SLURM_PROCID']
        os.environ['LOCAL_RANK'] = os.environ['SLURM_LOCALID']


if __name__ == '__main__':
    distr_env = DistributedEnviron()
    print('master addr :', distr_env.master_addr)
    print('master port :', distr_env.master_port)
    print('world size  :', distr_env.world_size)
    print('rank        :', distr_env.rank)
    print('local rank  :', distr_env.local_rank)

The code below will use the options specified above and start multiple processes that work on the Data in parallel using the DistributedDataParallel method.

cnn_distr.py

import os
import random
import time
import numpy as np
import torch
import torch.distributed as dist
import torch.nn.functional as F
import torch.optim as optim
from torch.nn.parallel import DistributedDataParallel
from torch.utils.data import DataLoader, Dataset, DistributedSampler
from torchvision import models
from pt_distr_env import DistributedEnviron


num_warmup_epochs = 2
num_epochs = 5
batch_size_per_gpu = 128
num_iters = 25
model_name = 'resnet152'

distr_env = DistributedEnviron()
dist.init_process_group(backend="nccl")
world_size = dist.get_world_size()
rank = dist.get_rank()
device = distr_env.local_rank

model = getattr(models, model_name)()
model.to(device)

optimizer = optim.SGD(model.parameters(), lr=0.01)

class SyntheticDataset(Dataset):
    def __getitem__(self, idx):
        data = torch.randn(3, 224, 224)
        target = random.randint(0, 999)
        return (data, target)

    def __len__(self):
        return batch_size_per_gpu * num_iters * world_size


ddp_model = DistributedDataParallel(model, device_ids=[device])

train_set = SyntheticDataset()
train_sampler = DistributedSampler(
    train_set,
    num_replicas=world_size,
    rank=rank,
    shuffle=False,
    seed=42
)
train_loader = DataLoader(
    train_set,
    batch_size=batch_size_per_gpu,
    shuffle=False,
    sampler=train_sampler,
    # num_workers=16
)


def benchmark_step(model, imgs, labels):
    optimizer.zero_grad()
    output = model(imgs.to(device))
    loss = F.cross_entropy(output, labels.to(device))
    loss.backward()
    optimizer.step()


# warmup
for epoch in range(num_warmup_epochs):
    for step, (imgs, labels) in enumerate(train_loader):
        benchmark_step(ddp_model, imgs, labels)

# benchmark
imgs_sec = []
for epoch in range(num_epochs):
    t0 = time.time()
    for step, (imgs, labels) in enumerate(train_loader):
        benchmark_step(ddp_model, imgs, labels)

    dt = time.time() - t0
    imgs_sec.append(batch_size_per_gpu * num_iters / dt)

    if rank == 0:
        print(f' * Rank {rank} - Epoch {epoch:2d}: '
              f'{imgs_sec[epoch]:.2f} images/sec per GPU')

imgs_sec_total = np.mean(imgs_sec) * world_size
if rank == 0:
    print(f' * Total average: {imgs_sec_total:.2f} images/sec')

You can run the code simply as

srun -p gpu -n 2 --gres gpu:tesla:2 -w falcon4,falcon5,falcon6 python cnn_distr.py
#### Or in the container
srun --mpi=pmi2 -p gpu -n 2 -N 1 --gres gpu:tesla:2 -w falcon4,falcon5,falcon6 singularity exec --nv pytorch_23.08-py3.sif python cnn_distr.py

You will need to keep in mind that this might freeze if you have a mismatch of GPU drivers and the installed PyTorch version(and the CUDA version that it leverages). You can set the environment variable NCCL_DEBUG=INFO before running the job to get output here. For example if you pick a PyTorch version that uses CUDA 11.8, but the GPU drivers are version 11.4, the job will get stuck at the communication level and never finish. The --mpi=pmi2 sets the communication method that different PyTorch processes will use. Also when running on multiple nodes you have to choose a homogeneous setup(falcon4-6).