Skip to content

Lab 7 - Introduction to Parallel Programming

Getting Started

As discussed in the lecture, we will be needing an MPI implementation to be able to compile and/or run distributed code. All clusters should have these available as modules or even loaded by default. The MPI standard is defined for C, C++ and FORTRAN, but in this lab, for sake of simplicity, we will use Python. Python has an available package mpi4py, that is an MPI interface for Python, which is quite similar to other standard implementations. This package finds use for example in DeepSpeed, which is an open source deep learning optimization library for PyTorch.

Complete

  • Get MPICH
  • Get Python/3.9...
  • Get mpi4py

Verify

  • mpicc : Error: Command line argument is needed!
  • python -c "import mpi4py" :

One way to do this is below.

ml mpich/3.4.3
ml python/3.9.12
python3 -m pip install mpi4py

MPI Hello World

Do all of the following in a folder called named lab7. Let's look at a simple 'hello world' program built to work with MPI. Below is the C++ implementation of this.

hello.cpp

#include <mpi.h> // Loads the MPI library
#include <stdio.h>

int main(int argc, char** argv){
        // Initialize the MPI communications
        MPI_Init(NULL, NULL);

        // Get rank of current process
        int rank;
        MPI_Comm_rank(MPI_COMM_WORLD, &rank);

        // Get the number of processes
        int world_size;
        MPI_Comm_size(MPI_COMM_WORLD, &world_size);

        printf("Hello word from rank %d of %d\n", rank, world_size);
        MPI_Finalize();
}
MPI programs always starts with MPI_Init(), which initializes the communication between processes, and ends with MPI_Finalize(), which closes this communication. Two of the arguably most important functions or values in this workspace are MPI_Comm_rank, which yields the position of the current process in the communication network, and MPI_Comm_size, which tells us how many processes there are in the specified group(MPI_COMM_WORLD in this case).

Complete

  • Compile this program with the MPI compiler to be called hello
  • Run the compiled program with 4 processes

Verify

Output:
Hello word from rank 3 of 4
Hello word from rank 1 of 4
Hello word from rank 2 of 4
Hello word from rank 0 of 4

Now, let's try to build the same thing in mpi4py. Below is the equivalent Python code, you just need to find the mpi4py syntax to assign values to comm, rank and world_size. For this and all of the following tasks, consult the documentation here: https://mpi4py.readthedocs.io/en/stable/.

hello.py

from mpi4py import MPI

comm = #TODO
rank = #TODO
world_size = #TODO

if rank == 0:
    print(comm  )
print("Hello world, from rank {} of {}".format(rank, world_size))

Complete

  • Assign the needed variables
  • Run the program with the MPI launcher with 4 processes. The executable in this case is python hello.py, not just hello.py.
  • Save the output of the program with 4 MPI tasks into a file called Answer1.txt. You can do this easily by adding ">> Answer1.txt" to the line you execute the code with.

Verify

Answer1.txt holds: Hello word from rank 3 of 4
Hello word from rank 1 of 4
Hello word from rank 2 of 4
Hello word from rank 0 of 4

MPI and Slurm

Now that we have executed MPI programs in the login nodes, we can schedule them to run in compute nodes also. Slurm and MPI work together in a way such that you do not have to define function parameters at the execution, but instead you can do it with #SBATCH variables(#SBATCH -n parameter is tells the MPI execute how many programs to run). The program itself here is also launched with srun instead of ’mpirun/mpiexec’.

Complete

  • Create a batch file called batch.answer and make it so that it runs on the main partition, requests 1 cpu per task(use --cpus-per-task), writes the output to out.txt and runs 4 tasks. Have it execute the hello.py script we finished in the last exercise.

What do you notice in the output? It should be different from the previous ones.

Complete

  • Have a look at https://slurm.schedmd.com/mpi_guide.html and the --mpi flag and find what you need to add to get the correct output, hint you can also find it in our docs.
  • Add the extra flag to both of the batch scripts.

Sending data

Next we are going to Write an MPI program that on rank 0 creates an array of 10 elements(0 thorough 9), sends it to rank 1, and then prints it from rank 1. We talked in the lecture about how MPI programs are executed. Essentially, all processes run the exact same code, but the rank variable provided by MPI is different. We can use this to structure our code to perform different actions based on the rank of the program. If we want perform one action on rank 0(creating elements and sending them), we can make an if rank == 0: block to specify these actions, and other actions on rank 1(receive and print). You will just have to initialize the necessary variables for this beforehand, and code the send and receive.

Complete

Write the program described above and launch it with the correct amount of proceses. Name the code Answer3.py.

Verify

Output:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Matrix-vector product

This exercise might be a bit harder. This is why most of the program is also done for you. The program takes in the matrix size as a command line argument. We will divide the work up by the processes and we will generate matrix sections accordingly(if you had 2 processes you would split the matrix in half). We will use the variables rank and size to do this flexibly. Dividing data up like this according to such variables is a common procedure in distributed computing. All processes will calculate the product of their section of the matrix using the matvec function. This function gets the vector values from all ranks using allgather and does the calculation using the vector on a smaller part of the matrix.

matvecproduct.py

from mpi4py import MPI
import numpy as np
import sys
import time

def matvec(comm, A, x):
    m = A.shape[0]  # local rows
    p = comm.Get_size()
    xg = np.zeros(m*p, dtype='d')
    comm.Allgather([x,  MPI.DOUBLE], # Allgather is in essence a gather + bcast. This means that all threads end up with the same data at the receive buffer
                [xg, MPI.DOUBLE])    # In this case the receive buffer xg will include, after the allgather, values of x from all ranks
    y = np.dot(A, xg)
    return y

if __name__ == "__main__":
    curr = time.time() # We will measure the time it takes to run this program

    comm = MPI.COMM_WORLD
    rank = comm.Get_rank()
    size = comm.Get_size()

    # Let's set up a matrix size depending on our input (nxn matrix) and calculate the local slice based on number of processes
    matrix_size = int(sys.argv[1])  # Takes command line argument for matrix size
    local_matrix_size =        # TODO calculate local size

    # Let's generate arrays for our local variables A and x. To calculate the dimensions you should use matrix_size and local_matrix_size. Hint: np.random.rand
    local_A =                  # TODO  This is a matrix
    local_x =                  # TODO  This is a vector

    # Calculate the local matrix and vector product
    local_result =             # TODO

    # We create a global_result array for rank 0 to gather our result in. Other ranks will not need this array
    if rank == 0:
        global_result = np.empty(matrix_size, dtype='d')
    else:
        global_result = None

    # TODO Lastly we will need to gather all the local results to rank 0 and then we have the result
    # comm.

    # You can uncomment these to see the local results
    # print("Local A (rank {}):".format(rank))
    # print(local_A)
    # print("Local x (rank {}):".format(rank))
    # print(local_x)
    if rank == 0:
        end = time.time()
        print(end-curr)

        print("Global result:")
        print(global_result)

Complete

  • Finish the TODO sections of the program above. Don't change anything else.
  • When youre done change the name of the file holding the program to be Answer4.py, nagios will run checks on the TODO sections.

Verify

Output should be something like the following:
0.0005207061767578125
Global result:
[1.08595585 1.67289166 1.11922498 2.08364315 1.50178143 1.66818505]

Scaling

Now that we have a working program we can observe how parallelizing it affects the performance. We talked about communication overhead in the lecture and how not having enough work that is able to run in parallel, can make the program inefficient. If we observe the results of using a matrix size of 50 with 50 processes and 1 process we can see the following:
matrix size: 50, processes: 50 - 0.04
matrix size: 50, processes: 1 - 0.0001
This is not an averaged trial, but still the slowdown caused by overhead is significantly noticeable. At smaller problem sizes it is very noticeable, but at bigger problem sizes this overhead is compensated for.

Complete

  • Look at a matrix size of 256, what is the maximum number of processes you can use? Why?
  • Look at a matrix size of 50000, what is the minimum number of processes you can use? Why?
  • Try different processes counts for the problem with size 25000. Calculate the speedup and efficiency for process counts of (1, 5, 25, 50)

Algebra subroutines

Below you will find a pure Python program, which will operate on 200 matrices of size 1000x1000. It will for each matrix find the corresponding transposed matrix, multiply it with the original one, and finally find the determinant of the product. The result will be the sum of all of these determinants. Running this program by itself, it will take a really long time. See if you can use some optimized libraries(numpy, scipy) to speedup this program and also divide the work into 2 using MPI. The program should run in under 10 seconds eventually. Since this takes some testing, I recommend to launch an interactive session for this. Then you can also try out different process counts. To get the sum in the end, find the MPI operation that gets the sum of the variables provided across all ranks.

matrixops.py

import random
import time
# Start the timer
start_time = time.time()

N = 200
num_matrices = 1000

def generate_matrix(N):
    return [[random.random() for _ in range(N)] for _ in range(N)]

def transpose_matrix(matrix):
    return [[row[i] for row in matrix] for i in range(len(matrix[0]))]

def multiply_matrix(matrix1, matrix2):
    res = [[0 for x in range(len(matrix1[0]))] for y in range(len(matrix1))]
    for i in range(len(matrix1)):
        for j in range(len(matrix2[0])):
            for k in range(len(matrix2)):
                res[i][j] += matrix1[i][k] * matrix2[k][j]
    return res

def getMatrixMinor(m,i,j):
    return [row[:j] + row[j+1:] for row in (m[:i]+m[i+1:])]

def getMatrixDeterminant(m):
    if len(m) == 2:
        return m[0][0]*m[1][1]-m[0][1]*m[1][0]

    determinant = 0
    for c in range(len(m)):
        determinant += ((-1)**c)*m[0][c]*getMatrixDeterminant(getMatrixMinor(m,0,c))
    return determinant

det_sum = 0

for _ in range(num_matrices):
    matrix = np.random.rand(N, N)
    transposed_matrix = np.transpose(matrix)
    result_matrix = np.dot(matrix, transposed_matrix)
    matrix_det = det(result_matrix)
    det_sum += matrix_det

print(det_sum)

# Calculate and print the elapsed time
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Elapsed time: {elapsed_time} seconds")

Complete

  • Use optimized libraries and MPI to get the code to run in under a second
  • Use 1 MPI operation to get the retrieve the sum of det_sum across all ranks

Distributing your code only gets you so far and hardware is not getting much faster anymore. The key to fast programs really is the program being written efficiently. This is a small program in scale so as you probably noticed, increasing the number of processes starts leading to worse performance quite fast. This is likely both due to memory limitations and time spent on communication. To recognize these and other issues in your own code later, you will have profile it, which unfortunately doesn't fit into the scope of this course, but there are many materials to be found online.

*MPI + OpenMP, experiment with bindings

Below is a really basic example of MPI with OpenMP. If you are interested in trying out bindings and distribution, you can do it with this code, read more here.

Complete

  • Compile the program. Make sure to enable OpenMP during compilation and use the MPI compiler.
  • Using mpich/3.4.3, you will need gcc to run it right now
  • Try different threading options, see if you can get 2 or more threads assigned to the same core when running with srun. Why is this not possible in the login nodes? Why does it look like you get more cores than you allocate? Check from the slides how to specify how many threads OpenMP creates.
#include <stdio.h>
#include <omp.h>
#include <sched.h>
#include <iostream>
#include <mpi.h>

int main() {
        MPI_Init(NULL, NULL);

        int world_size;
        MPI_Comm_size(MPI_COMM_WORLD, &world_size);

        int world_rank;
        MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);

        int num = 8;
        int data[num];

        #pragma omp parallel
        {
            int threadID = omp_get_thread_num();
            int coreID = sched_getcpu();

            #pragma omp critical
            {
                std::cout << "Rank " << world_rank << "| Thread " << threadID << "| is running on CPU core " << coreID << std::endl;
            }
            #pragma omp for
            for (int i = 0; i < num; i++){
                    data[i] = omp_get_thread_num();
            }
        }
        for (int i = 0; i < num; i++){
                printf("index %d, thread %d \n",i, data[i]);
        }
        MPI_Finalize();
        return 0;
}