Get-started with MPI
Contents
Get-started with MPI
Applications
Distributed Summation
To demonstrate the usage of MPI collective calls, we implement the parallel summation of:
This is an over-simplied example, but its computation pattern (i.e., task decomposition and result reduction)
is frequently used in parallel computation.
The code can be downloaded at app-distributed-sum.cpp.
We start with defining the “tasks” - each process is responsible for the summation in
a subrange of [0, N). We compute the subrange boundaries at the root process,
and scatter them to all processes:
typedef std::array<int, 2> range_t;
range_t range;
vector<range_t> ranges;
if( rank == 0 ) {
int N = 10000, N_each = N / n_procs;
for(int i=0; i<n_procs; ++i)
ranges.push_back({i*N_each, (i+1)*N_each});
}
comm.scatter(ranges.data(), range, 0);
Here, each range boundary is represented by a pair of integers. In more general
applications, task-specification may be described by structural object. The
communication calls may correspondingly
involve sequence of char or user-defined datatypes.
After knowing its task, each process perform the local summation. The local
results are then sum- reduce to the root process and get printed:
auto [b,e] = range;
int local_sum = 0, sum;
for(int i=b; i<e; ++i) local_sum += i;
comm.reduce(local_sum, sum, SUM, 0);
if( rank == 0 )
pout << "Sum = ", sum, endl;
The output is:
Sum = 49995000
Computing PI by Numerical Integration
The mathematical constant \(\pi\) can be approximately evaluated by a numerical integration:
We use trapezoid method to approximate it, i.e., the integration interval \([0,\,1]\) is divided into \(N\) sub intervals with equal length \(h=1/N\). The location of \(i\)-th interval is \(x_i = (i+0.5)h\) (0-indexed). The approximated integration in this interval is \(f(x_i)h\), where \(f=4/(1+x^2)\) is the integrand.
This is an typical embarassing parallel computation task. We implement it in the following.
This example is taken from Ch-3.3 of [GroppW-UMPIv3]. The code can be downloaded at
app-pi-computation.cpp.
The process ranked rank is responsible for the intervals indexed rank, rank+n_procs, rank+2*n_procs, ....
The local integration value is simply evaluated by:
int N = 10000;
double h = 1.0 / N, sum = 0.;
for(int i=rank; i<N; i+=n_procs){
double x = h * (i+0.5);
sum += 1. / ( 1. + x*x );
}
double my_pi = 4.0 * h * sum;
All local results are reduced by SUM operator to a single final value, which
is printed finally:
double pi;
comm.reduce(my_pi, pi, SUM, 0);
if( rank == 0 )
pout << "Find PI = ", pi, endl;
The output is:
Find PI = 3.14159
A Self-Scheduling Example: Matrix-Vector Multiplication
In the following example, we use multiple processes to compute the multiplication of a matrix \(A\) and a vector \(b\),
whose shapes are \((n_{\rm rows},\, n_{\rm cols})\) and \((n_{\rm cols},\,1)\).
The result is a vector \(c\) of shape \((n_{\rm rows},\,1)\).
This example can be found in Ch3.6 of [GroppW-UMPIv3].
The codes can be found at app-matrix-vec-dot.cpp.
The matrix-vector multiplication is defined as, for each row \(i\),
The simplest task-decomposition scheme is to distribute rows of \(A\) among processes and let each process compute on a subset of rows.
In real applications, sub-tasks may consum various amount of times. To balance
the works on processes, self-scheduling algorithm is necessary. In this example,
we use a single master process (also called root process, ranked root=n_procs-1)
to control and assign tasks to other worker processes (ranked in [0, n_procs-1)).
The steps for this algorithm are
The master broadcasts all shared data to workers.
The master assigns each worker a initial task.
The worker waits for a job, finishes it, and replies the answer to the master.
The master waits for reply. Once he gets a reply, stores the answer and sends another task to the worker.
If all tasks finished, the master sends a stop signal to each worker.
Here, we use HIPP’s DArray to describe these numerical arrays:
#include <hippnumerical.h>
using HIPP::NUMERICAL::DArray;
typedef DArray<double, 2> arr_t; // matrix
typedef DArray<double, 1> vec_t; // vector
To implement the algorithm, we begin with defining the algorithm of a worker:
// if rank != root
vec_t a({n_cols}), b({n_cols});
comm.bcast(b, root);
while(1) {
int tag = comm.recv(root, ANY_TAG, a).tag();
if( tag == n_rows ) break;
double res = (a*b).sum();
comm.send(root, tag, res);
}
Here, the worker first gets the shared data, i.e., \(b\), then waits
the assignment repeatedly. Each assignment is described by a row \(a\) of
\(A\). The row number is described by the communication tag for clarity.
The stop signal comes simply when the row number approaches the actual number
of rows of \(A\). Once get the tasks specification, the worker finishes
the job (i.e., by a vector-vector inner product (a*b).sum()) and sends
back the result.
The master process has to first initialize the shared data, \(b\), broadcasts
them to workers. The matrix \(A\) is also initialized, but gets sent later.
Here we simply set them to constant 1.0:
// if rank == root
arr_t A({n_rows, n_cols}, 1.0);
vec_t b({n_cols}, 1.0), c({n_rows});
comm.bcast(b, root);
The master assigns initial tasks to workers by sending the first n_workers
rows to them, respectively:
int row = 0;
while(row < n_workers && row < n_rows) {
comm.send(row, row, &A(row, 0), n_cols); // send initial rows to workers
++ row;
}
Then, the master waits for reply, stores the replied answer, and sends the
next available row to worker. Finally, empty massages with tag = n_rows
are sent as stop signal:
double res;
for(int i=0; i<n_rows; ++i){
Status st = comm.recv(ANY_SOURCE, ANY_TAG, res);
int src = st.source(), tag = st.tag();
c[tag] = res;
if( row != n_rows ){ // send another row of A
comm.send(src, row, &A(row,0), n_cols);
++row;
}else {
comm.send(src, row); // an empty message
}
}
pout << "c = A * b = ", c, endl;
Taking a small \(8 \times 16\) matrix \(A\) as example, the printed answer is:
c = A * b = DArray{16,16,16,16,16,16,16,16}