PyTorch on Rocket¶
Introduction¶
PyTorch is an open source Python package that provides tensor computation, like numpy, with GPU acceleration and deep neural networks.
PyTorch can be installed with pip like so:
module load py-pip/23.0
pip install torch torchvision torchaudio python-hostlist
Python-hostlist here is necessary to facilitate communication between ranks when running multiple processes on multiple GPUs.
Container Installation¶
One of ways to get PyTorch to run is to use containers. On HPC clusters we use Singularity for this, since it doesn't require root privileges. Singularity can be loaded as a module.
module load singularity
We also set the environment variable $SINGULARITY_TMPDIR and $SINGULARITY_CACHEDIR to direct unneeded data to a familiar place, which we can get rid of later.
mkdir tmp cache
export SINGULARITY_CACHEDIR=$PWD/cache
export SINGULARITY_TMPDIR=$PWD/tmp
singularity pull docker://nvcr.io/nvidia/pytorch:23.08-py3
rm -rf tmp cache
Code examples¶
There are many ways to distribute training in PyTorch, which you can read about here. One way to not do this is DataParallel, because it uses threading and threading has really poor performance due to the underlying mechanics of Python. So if we want to use multiple GPUs efficiently we will have to use multiple tasks. Below is code that will set up the environment, so that these different tasks that are started can communicate with each other. It essentially sets up a communicator port and establishes the rank of each process.
pt_distr_env.py¶
import os
import hostlist
class DistributedEnviron():
def __init__(self):
self._setup_distr_env()
self.master_addr = os.environ['MASTER_ADDR']
self.master_port = os.environ['MASTER_PORT']
self.world_size = int(os.environ['WORLD_SIZE'])
self.rank = int(os.environ['RANK'])
self.local_rank = int(os.environ['LOCAL_RANK'])
def _setup_distr_env(self):
hostnames = hostlist.expand_hostlist(os.environ['SLURM_JOB_NODELIST'])
os.environ['MASTER_ADDR'] = hostnames[0]
os.environ['MASTER_PORT'] = '39591'
os.environ['WORLD_SIZE'] = os.environ['SLURM_NTASKS']
os.environ['RANK'] = os.environ['SLURM_PROCID']
os.environ['LOCAL_RANK'] = os.environ['SLURM_LOCALID']
if __name__ == '__main__':
distr_env = DistributedEnviron()
print('master addr :', distr_env.master_addr)
print('master port :', distr_env.master_port)
print('world size :', distr_env.world_size)
print('rank :', distr_env.rank)
print('local rank :', distr_env.local_rank)
The code below will use the options specified above and start multiple processes that work on the Data in parallel using the DistributedDataParallel method.
cnn_distr.py¶
import os
import random
import time
import numpy as np
import torch
import torch.distributed as dist
import torch.nn.functional as F
import torch.optim as optim
from torch.nn.parallel import DistributedDataParallel
from torch.utils.data import DataLoader, Dataset, DistributedSampler
from torchvision import models
from pt_distr_env import DistributedEnviron
num_warmup_epochs = 2
num_epochs = 5
batch_size_per_gpu = 128
num_iters = 25
model_name = 'resnet152'
distr_env = DistributedEnviron()
dist.init_process_group(backend="nccl")
world_size = dist.get_world_size()
rank = dist.get_rank()
device = distr_env.local_rank
model = getattr(models, model_name)()
model.to(device)
optimizer = optim.SGD(model.parameters(), lr=0.01)
class SyntheticDataset(Dataset):
def __getitem__(self, idx):
data = torch.randn(3, 224, 224)
target = random.randint(0, 999)
return (data, target)
def __len__(self):
return batch_size_per_gpu * num_iters * world_size
ddp_model = DistributedDataParallel(model, device_ids=[device])
train_set = SyntheticDataset()
train_sampler = DistributedSampler(
train_set,
num_replicas=world_size,
rank=rank,
shuffle=False,
seed=42
)
train_loader = DataLoader(
train_set,
batch_size=batch_size_per_gpu,
shuffle=False,
sampler=train_sampler,
# num_workers=16
)
def benchmark_step(model, imgs, labels):
optimizer.zero_grad()
output = model(imgs.to(device))
loss = F.cross_entropy(output, labels.to(device))
loss.backward()
optimizer.step()
# warmup
for epoch in range(num_warmup_epochs):
for step, (imgs, labels) in enumerate(train_loader):
benchmark_step(ddp_model, imgs, labels)
# benchmark
imgs_sec = []
for epoch in range(num_epochs):
t0 = time.time()
for step, (imgs, labels) in enumerate(train_loader):
benchmark_step(ddp_model, imgs, labels)
dt = time.time() - t0
imgs_sec.append(batch_size_per_gpu * num_iters / dt)
if rank == 0:
print(f' * Rank {rank} - Epoch {epoch:2d}: '
f'{imgs_sec[epoch]:.2f} images/sec per GPU')
imgs_sec_total = np.mean(imgs_sec) * world_size
if rank == 0:
print(f' * Total average: {imgs_sec_total:.2f} images/sec')
You can run the code simply as
srun -p gpu -n 2 --gres gpu:tesla:2 -w falcon4,falcon5,falcon6 python cnn_distr.py
#### Or in the container
srun --mpi=pmi2 -p gpu -n 2 -N 1 --gres gpu:tesla:2 -w falcon4,falcon5,falcon6 singularity exec --nv pytorch_23.08-py3.sif python cnn_distr.py
You will need to keep in mind that this might freeze if you have a mismatch of GPU drivers and the installed PyTorch version(and the CUDA version that it leverages). You can set the environment variable NCCL_DEBUG=INFO
before running the job to get output here. For example if you pick a PyTorch version that uses CUDA 11.8, but the GPU drivers are version 11.4, the job will get stuck at the communication level and never finish. The --mpi=pmi2
sets the communication method that different PyTorch processes will use. Also when running on multiple nodes you have to choose a homogeneous setup(falcon4-6).