4.3. PARALLEL LANGUAGES


4.2. MESSAGE-PASSING LIBRARIES

Message-passing libraries directly implement the message-passing parallel programming model and allow programmers to explicitly write message-passing programs. The basic paradigm of message passing implemented in different libraries (PARMACS, Chameleon, CHIMP, PICL, Zipcode, p4, PVM, MPI, etc.) is the same and was briefly summarized in Section 4.1. The libraries differ in details of implementation of the same basic model. The most popular libraries are MPI (Message-Passing Interface) and PVM (Parallel Virtual Machine). Absolute majority of existing message-passing code is written using one of the libraries.

In this book we outline MPI. The main reason we focus on this library is that it has been standardized (in 1995) as MPI 1.1 and widely implemented in compliance with the standard. Practically all hardware vendors offer MPI. In addition there exist free high-quality MPI implementations such as LAM MPI from Ohio Supercomputing Center (currently supported by Open System Laboratory at Indiana University) and MPICH from Argonne National Laboratory and Mississippi State University. MPI supports parallel programming in C and Fortran on all MPP architectures, including Unix and Windows NT platforms.

Some extensions to MPI 1.1, known as MPI 2.0, were released in 1997. The extensions include Fortran 90 and C+ bindings, parallel I/O, one-sided communications, etc. A typical MPI library fully implements MPI 1.1 and optionally supports some features of MPI 2.0. Our introduction into MPI will be based on the C interface to MPI 1.1.

4.2.1. Basic MPI Programming Model

An MPI program consists of a fixed number of processes, executing their own code in their own address space. The codes executed by each process do not need to be identical. The processes communicate via calls to MPI communication primitives.

MPI does not provide mechanisms to specify the number of processes of an MPI program, the code to be executed by each of the processes, and the allocation of the processes to physical processors. Such mechanisms are external to the MPI program and must be provided by particular MPI implementations. Instead MPI provides inquiring operations allowing the processes of the MPI program to determine their total number and identify themselves in the program.

MPI provides two types of communication operation: point-to-point and collective operations. A point-to-point operation involves two processes, one of which sends a message and other receives the message. A collective operation, such as barrier synchronization and broadcast, involves a group of processes.

In MPI a process group is defined as an ordered collection of processes, each with a rank. Groups define the scope of collective communication operations. That allows collective operations to avoid unnecessarily synchronizing uninvolved processes (potentially running unrelated code). In point-_to-point communication operations, groups define a scope for process names; that is, participating processes are specified by their rank in the same process group.

MPI does not provide a mechanism to build a group from scratch, but only from other, previously defined groups. There are operations to construct new groups by subsetting and supersetting existing groups. Immediately after MPI is initialized, only one group consisting of all processes that make up the MPI program is available. This group is the base group upon which all other groups are defined.

MPI provides a mechanism that allows the programmer to safely separate messages that do not have to be logically mixed, even when the messages are transferred between processes of the same group. The mechanism is called communicators. Logically a communicator may be seen as a separate communication layer associated with a group of processes. There may be several communicators associated with the same group, providing nonintersecting communication layers. Communication operations explicitly specify the communicator, which will be used to transmit messages. Messages transmitted over different communicators cannot be mixed; that is, a message sent through a communicator is never received by a process not communicating over the communicator.

Technically the creation of a communicator consists in generation of a unique tag that becomes attached to all messages sent through the communicator and used by processes to filter incoming messages. The tag is generated at runtime and shared by all processes of the group, with which the communicator is associated.

This feature of MPI is important. It is the very feature that allows the programmer to use MPI for writing parallel libraries. In other words, the programmer can write an MPI subprogram that can be safely used by other programmers in their MPI programs without any knowledge of the details of its implementation. In contrast, the other popular message-passing library, PVM, does not have the capacity to separate safely communication layers for message passing, and therefore it cannot be used for implementation of parallel libraries. The point is that the only unique attribute characterizing a PVM process is its ID assigned at runtime to each process of the PVM program. All other communication attributes, which could be used to separate messages, such as groups and tags, are user-defined. Therefore they do not have to be unique at runtime, especially if different modules of the program are written by different programmers.

To see more precisely why the PVM approach does not allow the programmer to safely localize communications inside a parallel module, consider the following message-passing pseudocode:

 extern Proc(); if(my process ID is A)    Send message M with tag T to process B Proc(); if(my process ID is B)    Receive a message with tag T from process A

The code is supposed to ensure the following scenario. First, the process A sends a message M to the process B. Then all processes of the message-passing program, including processes A and B, execute the externally defined parallel procedure Proc. Then, after the process B has left the procedure Proc, it receives the message M. This desirable semantics of the program may be broken if the procedure Proc performs a point-to-point communication operation also sending a message from the process A to the process B. In that case the message M sent by the process A from the outside of the procedure Proc may be intercepted inside this procedure by the process B. Correspondingly the message sent by the process A from the inside of the procedure Proc will be received by the process B outside. The user-defined tag T cannot solve the problem of matching the sending and receiving operations, since there is no guarantee that the programmer who coded the procedure Proc has not used the same tag.

The mechanism of communicators provided by MPI easily solves the problem, allowing for the following safe implementation of the program above:

 extern Proc (); Create communicator C for a group including processes A and B if(my process ID is A)    Send message M to process B through communicator C Proc(); if(my process ID is B)    Receive a message from process A through communicator C

A unique tag attached to any message sent over the communicator C prevents the interception of the message inside the procedure Proc, since a communication operation sending or receiving messages with the same tag cannot occur in this procedure.

The following sections present in more detail the operations on groups and communicators, point-to-point and collective communication operations, and operations for MPI environmental management.

4.2.2. Groups and Communicators

A group is an ordered set of processes. Each process in a group is associated with an integer rank. Ranks are contiguous and start from zero. Groups are represented by opaque group objects of the type MPI_Group, and hence cannot be directly transferred from one process to another.

A context is a unique, system-generated tag that differentiates messages. The MPI system manages this differentiation process.

A communicator brings together the concepts of group and context. MPI communication operations reference communicators to determine the scope and the “communication universe” in which a point-to-point or collective operation is to operate. A communicator contains an instance of a group, a context for point-to-point communication, and a context for collective communication.

The group associated with the communicator always includes the local process. The source and destination of a message is identified by process rank within that group. For collective communication, the communicator specifies the set of processes that participate in the collective operation. Thus the communicator restricts the “spatial” scope of communication, and provides machine-independent process addressing through ranks. Communicators are represented by opaque communicator objects of the type MPI_Comm, and hence they cannot be directly transferred from one process to another.

Communicators described above are also called intracommunicators. They are used for point-to-point communication between processes of the same group and for collective communication. In addition to intracommunicators, MPI introduces intercommunicators used specifically for point-to-point communication between processes of different groups. We do not consider those communication operations in the book, and hence only deal with intracommunicators.

An initial pre-defined communicator MPI_COMM_WORLD is a communicator of all processes making up the MPI program. This communicator has the same value in all processes. The base group, upon which all other groups are defined, is the group associated with the initial communicator MPI_COMM_WORLD. This group does not appear as a pre-defined constant, but it may be accessed using the function

 int MPI_Comm_group(MPI_Comm comm, MPI_Group *group),

which returns in group a handle to the group of comm.

Other group constructors provided by MPI either explicitly list the processes of an existing group, which make up a new group, or do set-like binary operations (union, intersection, and difference) on existing groups to construct a new group.

The function

 int MPI_Group_incl(MPI_Group group, int n, int *ranks,    MPI_Group *newgroup)

creates a group newgroup that consists of the n processes in group with ranks rank[0],. . ., rank[n-1]. The process with rank k in newgroup is the process with rank rank[k] in group.

The function

 int MPI_Group_excl(MPI_Group group, int n, int *ranks,    MPI_Group *newgroup)

creates a group newgroup that is obtained by deleting from group those processes with ranks rank[0],. . ., rank[n-1]. The ordering of processes in newgroup is identical to the ordering in group.

The function

 int MPI_Group_range_incl(MPI_Group group, int n.    int ranges[][3], MPI_Group *newgroup)

assumes that ranges consist of triplets

and constructs a group newgroup consisting of processes in group with ranks

click to expand

We may have fi > li, and si may be negative but cannot be zero. The process with rank k in newgroup is the process with kth rank in the list of ranks above.

The function

 int MPI_Group_range_excl(MPI_Group group, int n.    int ranges[][3], MPI_Group *newgroup)

constructs newgroup by deleting from group those processes with ranks

click to expand

The ordering of processes in newgroup is identical to the ordering in group.

The function

 int MPI_Group_union(MPI_Group group1, MPI_Group group2,    MPI_Group newgroup)

creates a group newgroup that consists of all processes of group1, followed by all processes of group2. The function

 int MPI_Group_intersection(MPI_Group group1,     MPI_Group group2, MPI_Group newgroup)

creates a group newgroup that consists of all processes of group1 that are also in group2, ordered as in group1. The function

 int MPI_Group_difference(MPI_Group group1, MPI_Group group2,     MPI_Group newgroup)

creates a group newgroup that consists of all processes of group1 that are not in group2, ordered as in group1.

Note that for these three set-like operations the order of processes in the output group is determined primarily by the order in the first group and then, if necessary, by the order in the second group. Therefore neither union nor intersection is commutative, but both are associative.

Like groups, new communicators are created from previously defined communicators. Unlike group constructors, which are local operations, communicator constructors are collective operations that must be performed by all processes in the group associated with the existing communicator, which is used for creation of a new communicator.

The function

 int MPI_Comm_dup(MPI_Comm comm,  MPI_Comm *newcomm)

creates a communicator newcomm with the same group but with a new context.

The function

 int MPI_Comm_create(MPI_Comm comm, MPI_Group group,    MPI_Comm *newcomm)

creates a communicator newcomm with associated group defined by group and a new context. The function returns MPI_COMM_NULL to processes that are not in group. The call is to be executed by all processes in comm, even if they do not belong to the new group. All group arguments must have the same value (in particular, this is guaranteed if all processes build group exactly in the same way), and group must be a subset of the group associated with comm.

The function

 int MPI_Comm_split(MPI_Comm comm, int color, int key,    MPI_Comm *newcomm)

partitions the group associated with comm into disjoint subgroups, one for each nonnegative value of color. Each subgroup contains all processes of the same color. Within each subgroup the processes are ranked in the order defined by the value of the argument key. If, for a fixed color, the keys are not unique, the operation will order processes that supply the same key according to their rank in the parent group. A new communicator is created for each subgroup and returned in newcomm. A process may supply the color value MPI_UNDEFINED, in which case newcomm returns MPI_COMM_NULL.

There are two local operations allowing any member of the group associated with a communicator to determine its rank in the group and the total number of processes in the group. The function

 int MPI_Comm_size(MPI_Comm comm, int *size)

returns in size the number of processes in the group of comm. The function

 int MPI_Comm_rank(MPI_Comm comm, int *rank)

returns in rank the rank of the calling processes in the group of comm.

In the MPI program below, each process first determines the total number of processes executing the program and its rank in the global group associated with MPI_COMM_WORLD. Then two new communicators are created: one containing processes with even global ranks, and the other containing processes with odd global ranks. Next each process determines its local rank in the group associated with one of the newly created communicators. The source code of the program is

#include <mpi.h> int main(int argc, char **argv) {    int my_global_rank, my_rank_in_a, my_rank_in_b;    int global_size, size_a, size_b;    MPI_Comm comm_a, comm_b;    MPI_Init(&argc, &argv);    MPI_Comm_size(MPI_COMM_WORLD, &global_size);    MPI_Comm_rank(MPI_COMM_WORLD, &my_global_rank);    MPI_Comm_split(MPI_COMM_WORLD,          my_global_rank%2 ? 0 : MPI_UNDEFINED,          my_global_rank, &comm_a);    MPI_Comm_split(MPI_COMM_WORLD,          my_global_rank%2 ? MPI_UNDEFINED : 0,          my_global_rank, &comm_b);    if(comm_a != MPI_COMM_NULL) {       MPI_Comm_size(comm_a, &size_a);       MPI_Comm_rank(comm_a, &my_rank_in_a);    }    if(comm_b != MPI_COMM_NULL) {       MPI_Comm_size(comm_b, &size_b);       MPI_Comm_rank(comm_b, &my_rank_in_b);    }    ._._.    if(comm_a != MPI_COMM_NULL)       MPI_Comm_free(&comm_a);    if(comm_b != MPI_COMM_NULL)       MPI_Comm_free(&comm_b);    MPI_Finalize(); }

Note that in this program all MPI library functions are called after MPI_Init and before MPI_Finalize. This is a general rule. The function

 int MPI_Init(int *argc, char ***argv) 

initializes the MPI environment and must be called by all processes of the program before any other MPI function is called. The function must be called at most once; subsequent calls are erroneous. The function

 int MPI_Finalize(void)

cleans up all MPI state. Once the function is called, no MPI function (even MPI_Init) may be called.

The program above also uses group and communicator destructors. The operations are collective. They mark the corresponding group or communicator object for deallocation. Any ongoing operations that use this object will complete normally; the object is actually deallocated only if there are no other active references to it. The function

 int MPI_Comm_free(MPI_Comm *comm)

marks the communication object for dealocation. The handle is set to MPI_COMM_NULL. The function

 int MPI_Group_free(MPI_Group *group)

marks the group object for deallocation. The handle is set to MPI_GROUP_NULL.

4.2.3. Point-to-Point Communication

Point-to-point communication operations are the basic MPI communication mechanism. MPI provides a wide range of send and receive operations allowing the programmer to implement different modes of point-to-point communication such as blocking and nonblocking and synchronous and asynchronous.

Among all this diversity, there are two basic operations that are predominantly used in MPI applications. The two operations implement a clear and reliable model of point-to-point communication and allow the programmers to write portable MPI code. These two operations are a blocking send and a blocking receive.

The function

 int MPI_Send(void *buf, int n, MPI_Datatype datatype,    int dest, int tag, MPI_Comm comm)

implements a standard blocking send operation. The operation forms a message and sends it to the addressee.

The message consists of a data to be transferred and an envelope into which the data are put. The data part of the message may be empty (n=0). In that case the message consists only of the message envelope. The message envelope is a fixed part of message that carries information used to distinguish messages and selectively receive them. This information includes the message source, the message destination, the message tag, and the communicator.

The message source is implicitly determined by the identity of the message sender. The other fields are specified by arguments in the send operation.

The comm argument specifies the communicator that is used for the send operation. The message destination is specified by the dest argument as a rank of the destination process within the group associated with the communicator specified by comm.

The integer-valued message tag is specified by the tag argument. This integer can be used by the program to distinguish different types of messages. A valid tag value is nonnegative.

The data part of the message consists of a sequence of n values of the type specified by datatype. These values are taken from a send buffer, which consists of n entries of the type specified by datatype, starting at the entry at address buf. The argument datatype can specify a basic datatype, which corresponds to one of the basic datatypes of the C language, or a derived datatype, which is constructed from basic ones using datatype constructors provided by MPI.

In general, a datatype is an opaque object of the type MPI_Datatype. MPI provides pre-defined constants of that type for the basic datatypes such as MPI_CHAR (corresponds to signed char), MPI_SHORT (signed short int), MPI_INT (signed int), and MPI_FLOAT (float). Using basic datatypes can specify only contiguous buffers containing a sequence of elements of the same basic type. More general communication buffers are specified by using derived datatypes.

A derived datatype is constructed from basic datatypes and specifies two things:

  • A sequence of basic datatypes.

  • A sequence of integer displacements.

The displacements are not required to be positive, distinct, or in increasing order. Therefore the order of items need not coincide with their order in store, and an item may appear more than once. All methods of constructing derived datatypes can be applied recursively. Thus the derived datatypes can specify buffers that contain different datatypes (e.g., an integer count, followed by a sequence of real numbers) and noncontiguous buffers (e.g., a buffer containing a diagonal of a matrix).

A derived datatype is characterized by a sequence of pairs of the form

{(type0,disp0),. . .,(typen-1,dispn-1)},

where typei are basic types, and dispi are displacements. This sequence is called a type map. The sequence of basic datatypes (displacements are ignored)

{type0,.  . .,typen-1}

is called the type signature of the datatype. This type map, together with a base address buf , specifies a communication buffer that consists of n entries, where the ith entry is at address buf+dispi and has type typei. A data message assembled from such a communication buffer will consist of n values of the types defined by the type signature above. The extent of the datatype is defined to be the span from the first byte to the last byte occupied by entries in the datatype, and rounded up to satisfy alignment requirements.

A basic datatype can be characterized by a type map with one entry of the corresponding basic type and displacement zero. For example, the type map for MPI_INT is {(int,0)}.

The simplest datatype constructor allows replication of a datatype into contiguous locations. It is implemented by the function

 int MPI_Type_contiguous(int count, MPI_Datatype oldtype,     MPI_Datatype *newtype),

which concatenates count copies of oldtype to obtain newtype. Concatenation is defined using extent as the size of the concatenated copies. For example, if oldtype has type map

{(double,0),(char,8)}

with extent 16, and count=3, then the type map of the datatype returned by newtype will be

{(double,0),(char,8),(double,16),(char,24),(double,32), (char,40)}.

The function

 int MPI_Type_vector(int count, int blocklen, int stride,    MPI_Datatype oldtype,     MPI_Datatype *newtype)

implements a more general constructor, which creates newtype by replication of oldtype into a sequence of count equally spaced blocks. Each block is obtained by concatenating blocklen copies of oldtype. The spacing between blocks is equal to stride extents of oldtype. For example, assume again that oldtype has type map

{(double,0),(char,8)}

with extent 16. Then a call to MPI_Type_vector(2,3,4,oldtype,_&newtype) will create the datatype with type map

{(double,0),(char,8),(double,16),(char,24),(double,32), (char,40), (double,64),(char,72),(double,80),(char,88), (double,96),(char,104)}.

The function

 int MPI_Type_hvector(int count, int blocklen,     MPI_Aint stride, MPI_Datatype oldtype,     MPI_Datatype *newtype)

is identical to MPI_Type_vector, except that stride is given in bytes rather than in elements. MPI_Aint is an opaque MPI type used specifically for integers specifying the size in bytes.

The function

 int MPI_Type_indexed(int count, int *array_of_blocklens,    int array_of_disps, MPI_Datatype oldtype,     MPI_Datatype *newtype)

creates newtype by replication of oldtype into a sequence of count blocks (each block is a concatenation of oldtype), where each block can contain a different number of copies and have a different displacement. The ith block contains array_of_blocklens[i] elements, which start at displacement array_of_disps[i] measured in multiples of the oldtype extent. The function

 int MPI_Type_hindexed(int count, int *array_of_blocklens,    MPI_Aint *array_of_disps,    MPI_Datatype oldtype,    MPI_Datatype *newtype)

is identical to MPI_Type_indexed, except that block displacements in array_of_disps are given in bytes rather than in multiples of the oldtype extent.

The function

 int MPI_Type_struct(int count, int *array_of_blocklens,    MPI_Aint *array_of_disps,    MPI_Datatype *array_of_types,    MPI_Datatype *newtype) 

implements the most general constructor. It further generalizes the previous constructor in that it allows each block to consist of replications of different datatypes. Namely array_of_types[i] specifies the type of elements in ith block.

A derived datatype has to be committed before it can be used in a communication operation. A committed datatype can still be used as an argument in datatype constructors. The function

 int MPI_Type_commit(MPI_Datatype *datatype)

implements the commit operation. The function

 int MPI_Type_free(MPI_ Datatype *datatype)

marks the datatype object for deallocation and sets datatype to MPI_DATATYPE_NULL. Derived datatypes that were defined from the freed datatype are not affected.

The function

 int MPI_Recv(void *buf, int n, MPI_Datatype datatype,     int source, int tag, MPI_Comm comm,     MPI_Status *status)

implements a standard blocking receive operation. The receive buffer consists of the storage containing n consecutive elements of the type specified by datatype, starting at address buf. The length of the data part of the received message must not be greater than the length of the receive buffer.

The selection of a message by a receive operation is governed by the value of the message envelope. A message can be received by a receive operation if its envelope matches the source, tag, and comm values specified by the receive operation. The receiver may specify a wildcard MPI_ANY_SOURCE value for source, and/or a wildcard MPI_ANY_TAG value for tag, indicating that any source and/or tag are acceptable. It cannot specify a wildcard value for comm. Thus a message can be received by a receive operation only if it is addressed to the receiving process, has a matching communicator, has matching source (unless any source is explicitly specified as acceptable), and has a matching tag (unless any tag is explicitly specified as acceptable). The argument source, if different from MPI_ANY_SOURCE, is specified as a rank within the process group associated with the communicator specified by comm.

Note the asymmetry between send and receive operations: a send operation always sends a message to a unique receiver, whereas a receive operation may accept messages from an arbitrary sender. That communication driven by the sender is known as a push communication (unlike a pull communication driven by the receiver).

The source or tag of a received message may not be known if wildcard values were used in the receive operation. In that case the information is returned by the status argument of the type MPI_Status, which is a structure that contains at least three fields named MPI_SOURCE, MPI_TAG, and MPI_ERROR. The fields contain the source, tag, and error code, respectively, of the received message. An error, for example, occurs if all incoming data do not fit, without truncation, into the receive buffer. Status variables need to be allocated by the user; that is, they are not system objects.

We have specified the presented send and receive operations as blocking communication operations. In general, return from a blocking operation means that resources used by the operation are allowed to be re-used.

In case of the receive operation, the blocking simply means that MPI_Recv returns only after the data part of the incoming message has been stored in the receive buffer. In case of the send operation, it means that MPI_Send does not return until the message data and envelope have been safely stored away so that the sender is free to access and overwrite the send buffer. The message might be copied directly into the matching receive buffer, or it might be copied into a temporary system buffer.

Message buffering decouples the send and receive operations. A blocking send can complete as soon as the message was buffered, even if no matching receive has been executed by the receiver.

MPI_Send uses the standard communication mode. In this mode it is up to MPI to decide whether outgoing messages will be buffered. MPI may buffer outgoing messages. In such a case the send call may complete before a matching receive is invoked. On the other hand, buffer space may be unavailable, and MPI may choose not to buffer outgoing messages, for performance reasons. In this case the send call will not complete until a matching receive has been posted, and the data have been moved to the receiver.

Thus a send in standard mode can be started whether or not a matching receive has been posted. It may complete before a matching receive is posted. The standard mode send is nonlocal. In general, an operation is nonlocal if its completion requires the execution of some MPI procedure on another process. In the send operation its completion may depend on the occurrence of a matching receive.

While the standard communication mode is predominantly used, it is not the only mode of send operation supported by MPI. There are three additional communication modes.

In buffered mode, if a send is executed and no matching receive is posted, then MPI must buffer the outgoing message, allowing the send to complete. Thus a buffered mode send operation is local because its completion does not depend on the occurrence of a matching receive. However, this mode is not as safe as standard mode because an error will occur if there is insufficient buffer space.

In synchronous mode, a send will complete successfully only if a matching receive was posted, and the receive operation has started to receive the message sent by the synchronous send. Thus the completion of a synchronous send not only indicates that the send buffer can be re-used but also that the receiver has reached a certain point in its execution, namely that it has started executing the matching receive. The blocking synchronous send and the blocking receive provide synchronous communication semantics: a communication does not complete at either end before both processes rendezvous at the communication. The synchronous send is obviously nonlocal.

In ready mode, a send may be started only if the matching receive is already posted. Otherwise, the operation is erroneous and its outcome is undefined.

The functions of the form

 int MPI_xsend(void *buf, int n, MPI_Datatype datatype,    int dest, int tag, MPI_Comm comm)

are provided for the three additional communication modes, where x is B for buffered, S for synchronous, and R for ready mode.

Any valid MPI implementation guarantees certain properties of point-to-point communication. First of all, it guarantees a certain order in receiving messages sent from the same source. If a sender sends two messages in succession to the same destination, and both match the same receive, then this operation cannot receive the second message while the first one is still pending. If a receiver posts two receives in succession, and both match the same message, then the second receive operation cannot be satisfied by this message while the first is still pending. Briefly, we can say that messages do not overtakes each other. This requirement facilitates matching of sends to receives and hence helps writing deterministic message-passing code. Notice that the LogP model is more liberal allowing messages directed to a given process not to arrive in the same order as they are sent.

Second, a certain progress in the execution of point-to-point communication is guaranteed. If a pair of matching send and receive has been initiated on two processes, then at least one of these two operations will complete, independently of other actions in the system: the send operation will complete, unless the receive is satisfied by another message, and completes; the receive operation will complete, unless the message sent is consumed by another matching receive that was posted at the same destination process.

Unfortunately, MPI makes no guarantee of fairness in the handling of communication. Suppose that a send is posted. Then it is possible that the destination process repeatedly posts a receive that matches this send, yet the message is never received because it is each time overtaken by another message sent from another source. It is the programmer’s responsibility to prevent such unfairness (e.g., by creatively using tags or communicators).

In many MPPs, communication can be executed autonomously by an intelligent communication controller. This allows communication to overlap computation, which results in more efficient parallel programs. However, the blocking communication operations do not support this type of optimization.

Suppose that a blocking receive is posted. It can complete only after the matching send has started and the received message is stored in the receive buffer. So, during this time slot, which can be quite significant, the calling process is actually doing nothing except waiting for a message to arrive. Similarly suppose that a standard mode send is posted. In the case of limited buffer space, the send may not complete until a matching receive has been posted, and the data have been moved to the receiver. Again, the calling process will be doing nothing during a significant time slot.

A possible mechanism for achieving such overlap is multithreading. One way would be to use parallel threads, some responsible for communication and some responsible for computation. This mechanism is external to MPI.

MPI provides its own alternative mechanism to approach the problem. The mechanism is nonblocking communication. The idea underlying nonblocking communication is to split a one-piece communication operation into two suboperations. The first suboperation just initiates the entire communication operation but does not complete it. So, immediately after the suboperation returns, the calling process will be able to do computations concurrently with the communication operation as it is being executed. The second suboperation completes this communication operation.

More specifically, a nonblocking send start call initiates the send operation but does not complete it. The send start call will return before the message was copied out of the send buffer. A separate send complete call is needed to complete the communicatin, namely to verify that the data has been copied out of the send buffer. Similarly a nonblocking receive start call initiates the receive operation but does not complete it. The call will return before a message is stored into the receive buffer. A separate receive complete call is needed to complete the receive operation and verify that the data have been received into the receive buffer.

Nonblocking send start calls can use the same four communication modes as blocking sends: standard, buffered, synchronous, and ready. These carry the same meaning. A nonblocking ready send can be started only if a matching receive is posted. Nonblocking sends of other modes can be started whether or not a matching receive has been posted. In all cases the send start call is local: it returns immediately, independent of the status of other processes.

If the send mode is synchronous, then the send complete call can return only if a matching receive has started. If the send mode is buffered, then the message must be buffered if there is no pending receive, and the send complete call must return independent of the status of a matching receive. If the send mode is standard, then the send complete call may return before a matching receive occurred if the message is buffered. On the other hand, the send complete may not return until a matching receive occurred, and the message was copied into the receive buffer.

Note that nonblocking sends can be matched with blocking receives, and vice-versa. Nonblocking communications use opaque request objects of the type MPI_Request to identify communication operations and match the operation that initiates the communication with the operation that completes it. These are system objects that are accessed via a handle. A request object identifies various properties of a communication operation, such as the send mode, the communication buffer that is associated with it, its context, the tag and destination arguments, to be used for a send, or the tag and source arguments to be used for a receive. In addition this object stores information about the status of the pending communication operation.

The functions of the form

 int MPI_Ixsend(void *buf, int n, MPI_Datatype datatype,     int dest, int tag,     MPI_Comm comm, MPI_Request *request)

are used to start a nonblocking send, where x is empty for standard, B for buffered, S for synchronous, and R for ready mode. A nonblocking send call indicates that the system may start copying data out of the send buffer. The sender should not access any part of the send buffer after a nonblocking send operation is called, until the send completes.

The function

 int MPI_Irecv(void *buf, int n, MPI_Datatype datatype,     int source, int_tag,_MPI_Comm_comm,_    MPI_Request_*request)

starts a nonblocking receive. A nonblocking receive call indicates that the system may start writing data into the receive buffer. The receiver should not access any part of the receive buffer after a nonblocking receive operation is called, until the receive completes.

All these calls allocate a communication request object and associate it with the request handle request. The request can be used later to query the status of the communication or wait for its completion.

The function

 int MPI_Wait(MPI_Request *request, MPI_Status *status)

is used to complete a nonblocking communication. It returns when the operation identified by request is complete. The request object associated with request is de-allocated and the request handle is set to MPI_REQUEST_NULL. The call returns, in status, information on the completed operation. MPI_Wait is a blocking nonlocal operation.

MPI provides another function

 int MPI_Test(MPI_Request *request, int *flag,     MPI_Status *status), 

that also can be used to complete a nonblocking communication. Unlike MPI_Wait, MPI_Test is a nonblocking operation. It returns in flag either true ( 0) or false (= 0). It returns true if the operation identified by request is complete. In such a case MPI_Test acts exactly as MPI_Wait. If false is returned, it means that the operation identified by request is not complete. In this case the value of the status object is undefined. The use of the nonblocking MPI_Test call allows the user to schedule alternative activities within a single thread of execution. An event-driven thread scheduler can be emulated with periodic calls to MPI_Test.

In addition to the two basic complete operations, MPI provides various complete operations that can be used to wait for the completion of any, some, or all the operations in a list, rather than having to wait for a specific message. The remaining point-to-point communications operations are aimed mainly at better optimisation of computer resources such as memory and processor cycles. MPI_Probe and MPI_Iprobe operations allow for incoming messages to be checked without actually receiving them. The user can then decide how to receive them, based on the information returned by the probe. In particular, the user may allocate memory for the receive buffer according to the length of the probed message.

Often a communication with the same argument list is repeatedly executed within the inner loop of a parallel computation. The loop may be optimized by binding the list of communication arguments to a so-called persistent communication request once, out of the loop, and then repeatedly using the request to initiate and complete messages in the loop.

4.2.4. Collective Communication

Collective communication is communication that involves a group of processes. The basic collective communication operations are as follows:

  • Barrier synchronization across all group members

  • Broadcast from one member to all members of a group

  • Data from all group members gathered to one member

  • Data from one member scattered to all members of a group

  • Global reduction operations such as sum, max, min, and user-defined functions.

MPI also provides a few advanced variations and combinations of the basic collective operations.

In order to execute a collective operation, all processes in the group must call the corresponding communication function with matching arguments. Several collective operations such as broadcast and gather have a single originating or receiving process. Such a process is called the root.

A barrier call returns only after all group members have entered the call. Other collective communication calls can return as soon as their participation in the collective communication is complete. The completion of a call indicates that the caller is now free to access locations in the communication buffer. It does not indicate that other processes in the group have completed or even started the operation. Therefore a portable MPI program should not count on the effect of synchronizing all calling processes for all collective communication calls, except barrier calls.

Collective communication calls may use the same communicators as point-to-point communication. MPI guarantees that messages generated on behalf of collective communication calls will not be confused with messages generated by point-to-point communication. This separation is achieved by using separate contexts for point-to-point and collective communications (see Section 4.2.2 for the communicator structure).

The function

 int MPI_Barrier(MPI_Comm comm)

blocks the caller until all members of the group associated with comm have called it. The call returns at any process only after all group members have entered the call.

The function

 int MPI_Bcast(void *buf, int count, MPI_Datatype datatype,     int root, MPI_Comm comm)

broadcasts a message from the process with rank root to all processes of the group, itself included. It is called by all members of the group using the same arguments for comm, root. On return, the contents of root’s communication buffer have been copied to all processes. General, derived datatypes are allowed for datatype. The type signature obtained by count-fold replication of the type signature of datatype on any process must be equal to that at the root. This implies that the amount of data sent must be equal to the amount received, pairwise, between each process and the root. Distinct type maps between sender and receiver are allowed.

The function

 int MPI_Gather(void *sendbuf, int sendcount,     MPI_Datatype sendtype, void *recvbuf,     int recvcount, MPI_Datatype recvtype,    int root, MPI_Comm comm)

implements the basic gather operation. Each process (the root process included) sends the contents of its send buffer to the root process. The root process receives the messages and stores them in rank order. The outcome is as if each of n processes in the group (including the root process) had sent a message to the root by executing a call to

MPI_Send(sendbuf, sendcount, sendtype, root,. . .),

and the root had executed n receives,

MPI_Recv(recvbuf+i*recvcount*recvtype_extent,    recvcount, recvtype, i,. . .),

where recvtype_extent is the extent of recvtype, i=0,1,…,n-1.

The receive buffer is ignored for all nonroot processes. General, derived datatypes are allowed for both sendtype and recvtype. The type signature obtained by sendcount-fold replication of the type signature of sendtype on process i must be equal to the type signature obtained by recvcount-fold replication of the type signature of recvtype at the root.

All arguments to the function are significant on the root process, while on other processes, only arguments sendbuf, sendcount, sendtype, root, and comm are significant. The arguments root and comm must have identical values on all processes. The specification of counts and types should not cause any location on the root to be written more than once. Note that recvcount is the number of items the root receives from each process, not the total number of items it receives.

The function

 int MPI_Gatherv(void *sendbuf, int sendcount,     MPI_Datatype sendtype, void *recvbuf,     int *recvcounts, int *displs,     MPI_Datatype recvtype, int root, MPI_Comm comm)

extends the functionality of MPI_Gather by allowing a varying count of data from each process, since recvcounts is now an array. It also allows more flexibility as to where the data are placed on the root, by providing the new argument displs. The outcome is as if each of n processes in the group (including the root process) had sent a message to the root,

MPI_Send(sendbuf, sendcount, sendtype, root,. . .),

and the root had executed n receives,

MPI_Recv(recvbuf+disp[i]*recvtype_extent,    recvcounts[i], recvtype, i,. . .). 

Messages are placed in the receive buffer of the root process in rank order, that is, the data sent from process i is placed in the i-th portion of recvbuf. This i-th portion begins at offset displs[i] elements (in terms of recvtype) into recvbuf.

The function

 int MPI_Scatter(void *sendbuf, int sendcount,     MPI_Datatype sendtype, void *recvbuf,     int recvcount, MPI_Datatype recvtype,    int root, MPI_Comm comm)

is the inverse operation to MPI_Gather. The outcome is as if the root executes n sends,

MPI_Send(sendbuf+i*sendcount*sendtype_extent,    sendcount, sendtype, i,. . .),

where sendtype_extent is the extent of sendtype, and each process executes a receive,

MPI_Recv(recvbuf, recvcount, recvtype, root,. . .).

The function

 int MPI_Scatterv(void *sendbuf, int *sendcounts.    int *displs, MPI_Datatype sendtype,     void *recvbuf, int recvcount.    MPI_Datatype recvtype, int root, MPI_Comm comm)

is the inverse operation to MPI_Gatherv. It extends the functionality of MPI_Scatter by allowing a varying count of data to be sent to each process, since sendcounts is now an array. It also allows more flexibility as to where the data are taken from the root by providing the new argument displs. The outcome is as if the root executes n sends,

MPI_Send(sendbuf+displs[i]*sendtype_extent,    sendcounts[i], sendtype, i,. . .),

and each process executes a receive,

MPI_Recv(recvbuf, recvcount, recvtype, root,. . .).

The function

 int MPI_Reduce(void *inbuf, void *outbuf, int count,    MPI_Datatype datatype, MPI_Op op,    int root, MPI_Comm comm)

combines the elements provided in the input buffer of each process in the group, using the operation op, and returns the combined value in the output buffer of root. The input buffer is defined by the arguments inbuf, count, and datatype. The output buffer is defined by the arguments outbuf, count, and datatype. Both buffers have the same number of elements, with the same type. The arguments count, datatype, op, root, and comm must have identical values on all processes. Thus all processes provide input buffers and output buffers of the same length, with elements of the same type. Each process can provide one element, or a sequence of elements, in which case the combine operation is executed elementwise on each entry of the sequence.

The operation op can be either one of a pre-defined list of operations, or a user-defined operation. In general, an operation is an opaque object of the type MPI_Op. MPI provides the following pre-defined constants of that type for pre-defined operations: MPI_MAX (maximum), MPI_MIN (minimum), MPI_SUM (sum), MPI_PROD (product), MPI_LAND (logical and), MPI_BAND (bitwise and), MPI_LOR (logical or), MPI_BOR (bitwise or), MPI_LXOR (logical exclusive or), MPI_BXOR (bitwise exclusive or), MPI_MAXLOC (maximum value and its location), MPI_MINLOC (minimum value and its location).

For all the pre-defined operations, except MPI_MAXLOC and MPI_MINLOC, the datatype argument is an appropriate basic datatype. For example, the following function uses MPI_Reduce to compute the dot product of two real vectors that are distributed across a group of processes (for simplicity we assume that the length of the vectors is a multiple of the number processes in the group):

 double dot_product(double *x, double *y, int m, MPI_Comm comm) {    double local_dot_prod;    double global_dot_prod; /* result (at process zero) */       int i;    local_dot_prod = 0.;    for(i=0; i<m; i++)       local_dot_prod += x[i]*y[i];    MPI_Reduce(&local_dot_prod, &global_dot_prod, 1,             MPI_DOUBLE, MPI_SUM, 0, comm);    return global_dot_prod; } 

The argument m is the length of local subvectors x and y of the entire distributed vectors. The function returns the result at the process with rank zero.

The operation MPI_MAXLOC (MPI_MINLOC) computes a global maximum (minimum) and the rank of the process containing this value. The operation is applied to reduce a sequence of pairs. Each process supplies a value and its rank within the group, and the reduce operation will return the maximum (minimum) value and the rank of the first process (in rank order) with that value. Each pair is supplied in the form of a structure that contains two fields: one for value and other for rank. The rank field is always of the int type. The value field has distinct types depending on the type of value. In order to use MPI_MAXLOC and MPI_MINLOC in a reduce operation, one must provide a datatype argument that represents a pair (value and rank). MPI provides seven such pre-defined datatypes: MPI_FLOAT_INT (for float values), MPI_DOUBLE_INT (for double values), MPI_LONG_INT (for long values), MPI_2INT (for int values), MPI_SHORT_INT (for short values), MPI_LONG_DOUBLE_INT (for long double values).

The following function normloc uses MPI_Reduce to compute the maximum element of a real vector that is distributed across a group of processes and the rank of the process containing the largest element (for simplicity we assume that the length of the vector is a multiple of the number of processes in the group):

 typedef struct {double val; int rank;} double_int; double_int normloc(double *vec, int m, MPI_Comm comm) _{    double_int *in, *out, res;    int i, myrank;    in = calloc(m, sizeof(double_int));    out = calloc(m, sizeof(double_int));    MPI_Comm_rank(comm, &myrank);    for(i=0; i<m; i++) {       in[i].val = vec[i];       in[i].rank = myrank;    }    MPI_Reduce(in, out, m, MPI_DOUBLE_INT, MPI_MAXLOC, 0, comm);    if(myrank == 0) {       res = out[0];       for(i=1; i<m; i++)          if(res.val < out[i].val) {             res.val = out[i].val;             res.rank = out[i].rank;          }          else if(res.val == out[i].val && res.rank > out [i].rank)             res.rank = out[i].rank;    }    free(in);    free(out);    return res; }

The argument m is the length of the local subvector vec of the entire distributed vector. The function returns the result at process with rank zero.

All pre-defined operations are assumed to be associative and commutative. User-defined operations are assumed to be associative but may not be commutative. MPI provides the function

 int MPI_Op_create(MPI_User_function *fun, int commute, MPI_Op *op)

which binds a user-defined operation to an op handle that can subsequently be used in MPI_Reduce. If commute is zero (false), then the user-defined operation is assumed to be not commutative. In this case the order of operands is fixed and is defined to be in ascending process rank order, beginning with process zero. The order of evaluation can be changed, which takes advantage of the associativity of the operation. If commute is nonzero (true), the operation should be both commutative and associative. In that case, the order of evaluation can be changed, which takes advantage of commutativity and associativity.

The argument fun is the user-defined function, which type can be specified as follows:

 typedef void MPI_User_function(void *invec.    void *inoutvec, int len,     MPI_Datatype datatype);.

The datatype argument is a handle to the data type that was passed into the call to MPI_Reduce. The arguments invec and inoutvec are arrays of len elements of the type specified by datatype that the function fun is combining. The result of the reduction overwrites values in inoutvec. Each invocation of the function results in the pointwise evaluation of the reduce operator on len elements; that is, the function fun returns in inoutvec[i] the value invec[i]o inoutvec[i], for i=0,. . .,len-1, where o is the combining operation computed by the function.

General datatypes may be passed to the user-defined function. No MPI communication function may be called inside the user-defined function.

The function

 int MPI_Op_free(MPI_Op *op)

marks a user-defined reduction operation for deallocation and sets op to MPI_OP_NULL.

4.2.5. Environmental Management

MPI provides a few functions for various parameters that relate to the MPI implementation and the execution environment. For example, the function

 int MPI_Get_processor_name(char *name, int *resultlen)

returns the name of the processor, on which it was called, in the form of a character string. The argument name must represent storage that is at least MPI_MAX_PROCESSOR_NAME characters long. The number of characters actually written is returned in resultlen.

The function

 double MPI_Wtime(void)

returns a floating-point number of seconds, representing elapsed wall-clock time since some time in the past. This time in the past is guaranteed not to change during the life of the process. The times returned are local to the node that called them. There is no requirement that different nodes return the same time.

The function

 double MPI_Wtick(void)

returns the resolution if MPI_Wtime in seconds. That is, it returns, as a double precision value, the number of seconds between sucessive clock ticks.

4.2.6. Example of an MPI Application: Parallel Matrix-Matrix Multiplication

In order to illustrate parallel programming MPPs with MPI, we consider an application implementing the simplest parallel algorithm of matrix-matrix multiplication presented in Section 4.1. The source code of the application is as follows:

#include <stdio.h> #include <stdlib.h> #include <float.h> #include <mpi.h> int n, p; int main(int argc, char **argv) { int myn, myrank; double *a, *b, *c, *allB, start, sum, *allC, sumdiag; int i, j, k;    n = atoi(argv[1]);    MPI_Init(&argc, &argv);    MPI_Comm_size(MPI_COMM_WORLD,&p);    MPI_Comm_rank(MPI_COMM_WORLD,&myrank);    myn = n/p;    a = malloc(myn*n*sizeof(double));    b = malloc(myn*n*sizeof(double));    c = malloc(myn*n*sizeof(double));    allB = malloc(n*n*sizeof(double));    for(i=0; i<myn*n; i++) {          a[i] = 1.;          b[i] = 2.;    }    MPI_Barrier(MPI_COMM_WORLD);    if(myrank==0)       start = MPI_Wtime();    for(i=0; i<p; i++)       MPI_Gather(b, myn*n, MPI_DOUBLE, allB, myn*n,                    MPI_DOUBLE, i, MPI_COMM_WORLD);    for(i=0; i<myn; i++)       for(j=0; j<n; j++) {          sum = 0.;          for(k=0; k<n; k++)             sum += a[i*n+k]*allB[k*n+j];          c[i*n+j] = sum;       }    free(allB);    MPI_Barrier(MPI_COMM_WORLD);    if(myrank==0)       printf("It took %f seconds to multiply 2 %dx%d matrices.\n",                    MPI_Wtime()-start, n, n);    if(myrank==0)       allC = malloc(n*n*sizeof(double));    MPI_Gather(c, myn*n, MPI_DOUBLE, allC, myn*n,                    MPI_DOUBLE, 0, MPI_COMM_WORLD);    if(myrank==0) {       for(i=0, sumdiag=0.; i<n; i++)          sumdiag += allC[i*n+i];       printf("The trace of the resulting matrix is %f\n", sumdiag);    }    if(myrank==0)       free(allC);    MPI_Finalize();    free(a);    free(b);    free(c); }

It is assumed that via an external parameter, the user supplies the value for n and that this value is a multiple of the total number of processes executing the program, p. It is also assumed that process zero (myrank=0) runs on the node with I/O available. Each process has the following arrays on heap:

  • a, b, and c to store its horizontal myn X n slice of matrices A, B, and C respectively.

  • allB to store the full matrix B. This array is deallocated as soon as the process has computed all elements of its C slice.

The array allC is allocated on process zero after the parallel multiplication itself has been complete. This array is used as a receive buffer by the collective operation that gathers all slices of the resulting matrix C on process zero. After the matrix C is gathered on process zero in the array allC, this process computes the trace of the matrix and outputs it (for the user to check the correctness of the computations).

By the way, the loop

 for(i=0; i<p; i++)    MPI_Gather(b, myn*n, MPI_DOUBLE, allB, myn*n,           MPI_DOUBLE, i, MPI_COMM_WORLD);

which results in all processes receiving the matrix B, can be replaced by a single call of the form

MPI_Allgather(b, myn*n, MPI_DOUBLE, allB, myn*n.    MPI_DOUBLE, MPI_COMM_WORLD);.

MPI_Allgather is an example of an advanced variation of the basic gather operation. It is assumed that MPI can implement MPI_Allgather more efficiently than via successive calls to MPI_Gather.




Parallel Computing on Heterogeneous Networks
Parallel Computing on Heterogeneous Networks (Wiley Series on Parallel and Distributed Computing)
ISBN: B000W7ZQBO
EAN: N/A
Year: 2005
Pages: 95

flylib.com © 2008-2017.
If you may any questions please contact us: flylib@qtcs.net