\documentclass[a4paper,10pt]{article} \usepackage[utf8]{inputenc} \usepackage{graphicx} \usepackage{listings} \usepackage[usenames,dvipsnames,svgnames,table]{xcolor} \usepackage{amsmath} % Title Page \title{Note for MPI Endpoints} \author{} \begin{document} \maketitle %\begin{abstract} %\end{abstract} \section{Purpose} Use threads as if they are MPI processes. Each thread will be assigned a rank and be associated with a endpoints communicator (EP\_Comm). Convention: one OpenMP thread corresponds to one endpoint. \section{MPI Endpoints Semantics} \begin{center} \includegraphics[scale=0.4]{scheme.png} \end{center} Endpoints are created from one MPI communicator and the number of available threads: \begin{center} \begin{verbatim} int MPI_Comm_create_endpoints(MPI_Comm parent_comm, int num_ep, MPI_Info info, MPI_Comm out_comm_hdls[]) \end{verbatim} \end{center} ``In this collective call, a single output communicator is created, and an array of \verb|num_ep| handles to this new communicator are returned, where the $i^{th}$ handle corresponds to the $i^{th}$ rank requested by the caller of \verb|MPI_Comm_create_endpoints|. Ranks in the output communicator are ordered sequentially and in the same order as the parent communicator. After it has been created, the output communicator behaves as a normal communicator, and MPI calls on each endpoint (i.e., communicator handle) behave as though they originated from a separate MPI process. In particular, collective calls must be made once per endpoint.''\cite{Dinan:2013} ``Once created, endpoints behave as MPI processes. For example, all ranks in an endpoints communicator must participate in collective operations. A consequence of this semantic is that endpoints also have MPI process progress requirements; that operations on that endpoint are required to make progress only when an MPI operation (e.g. \verb|MPI_Test|) is performed on that endpoint. This semantic enables an MPI implementation to logically separate endpoints, treat them independently within the progress engine, and eliminate synchronization in updating their state.''\cite{Sridharan:2014} \section{EP types} \subsection*{MPI\_Comm} \verb|MPI_Comm| is composed by: \begin{itemize} \item[$\bullet$] \verb|bool is_ep|: true $\implies$ EP, false $\implies$ MPI classic; \item[$\bullet$] \verb|int mpi_comm|: handle to the parent MPI communicator; \item[$\bullet$] \verb|OMPbarrier *ep_barrier|: openMP barrier, used for in-process synchronization and is different from \verb|omp barrier|; \item[$\bullet$] \verb|int[2] size_rank_info[3]|: topology information of the current endpoint: \begin{itemize} \item rank of parent MPI process; \item size of parent MPI communicator; \item rank of endpoint, returned by \verb|MPI_Comm_rank|; \item size of EP communicator, returned by \verb|MPI_Comm_size|; \item in-process rank of endpoint; \item in-process size of EP communicator, also noted as the number of endpoints in one MPI process. \end{itemize} \item[$\bullet$] \verb|MPI_Comm *comm_list|: pointer of the first endpoint communicator of one process; \item[$\bullet$] \verb|Message_list *message_queue|: location of in-coming messages for each endpoint; \item[$\bullet$] \verb|RANK_MAP *rank_map|: a map composed by an integer and a pair of integers. The integer key represents the rank of an endpoint. The mapped type (pair of integers) gives the in-process rank of the endpoint and the rank of its parent MPI process: \begin{center} \begin{verbatim} rank_map->at(ep_rank)=(ep_rank_local, mpi_rank) \end{verbatim} \end{center} \item[$\bullet$] \verb|BUFFER *ep_buffer|: buffer (of type \verb|int|, \verb|float|, \verb|double|, \verb|char|, \verb|long|, and \verb|unsigned long|) used for in-process communication. \end{itemize} \subsection{MPI\_Request} \verb|MPI_Request| is composed by: \begin{itemize} \item[$\bullet$] \verb|int mpi_request|: handle to the MPI request; \item[$\bullet$] \verb|int ep_datatype|: data type of the communication; \item[$\bullet$] \verb|MPI_Comm comm|: handle to the EP communicator; \item[$\bullet$] \verb|int ep_src|: rank of the source endpoint; \item[$\bullet$] \verb|int ep_tag|: tag of the communication. \item[$\bullet$] \verb|int type|: type of the communication: \begin{itemize} \item 1 $\implies$ non-blocking send; \item 2 $\implies$ pending non-blocking receive; \item 3 $\implies$ non-blocking matching receive. \end{itemize} \end{itemize} \subsection{MPI\_Status} \verb|MPI_Status| consists of: \begin{itemize} \item[$\bullet$] \verb|int mpi_status|: handle to the MPI status; \item[$\bullet$] \verb|int ep_datatype|: data type of the communication; \item[$\bullet$] \verb|int ep_src|: rank of the source endpoint; \item[$\bullet$] \verb|int ep_tag|: tag of the communication. \end{itemize} \subsection{MPI\_Message} \verb|MPI_Message| includes: \begin{itemize} \item[$\bullet$] \verb|int mpi_message|: handle to the MPI message; \item[$\bullet$] \verb|int ep_src|: rank of the source endpoint; \item[$\bullet$] \verb|int ep_tag|: tag of the communication. \end{itemize} Other types, such as \verb|MPI_Info|, \verb|MPI_Aint|, and \verb|MPI_Fint| are defined in the same way. \section{P2P communication} All EP point-to-point communication use tag to distinguish the source and destination endpoint. To be able to add these extra information to tag, we require that the tag value is represented using 31 bits in the underlying MPI inmplemention. \includegraphics[scale=0.4]{tag.png} EP\_tag is user defined. MPI\_tag is internally computed and used inside MPI calls. Because of the extension of tag, wild-cards as \verb|MPI_ANY_SOURCE| and \verb|MPI_ANY_TAG| will not be usable directly. An extra step of tag analysis is needed which leads to the message dequeuing mechanism. \begin{center} \includegraphics[scale = 0.5]{sendrecv.png} \end{center} In MPI environment, each MPI process has an incoming message queue. In EP case, messages for all threads inside one MPI process are stored in this MPI queue. With the MPI 3 standard, we use the \verb|MPI_Improbe| routine to inquire the message queue and relocate the incoming message in the local message queue for the corresponding thread/endpoint. \includegraphics[scale=0.3]{dequeue.png} % Any EP calls will trigger the message dequeuing and the probing, (matched-)receiving operations are performed upon the local message queue. % % Example: \verb|EP_Recv(src=2, tag=10, comm1)|: % \begin{itemize} % \item[1.] Dequeue MPI message queue; % \item[2.] call \verb|EP_Improb(src=2, tag=10, comm1, message)|; % \item[3.] if find corresponding triple (src, tag, comm1), call \verb|EP_Mrecv(src=2, tag=10, comm1, message)|; % \item[4.] else, repeat from step 2. % \end{itemize} \paragraph{Messages are \textit{non-overtaking}} Incoming messages' order is important! If one thread is receiving multiple messages from the same source with the same tag. The receive order should be the same order in which the messages are sent. That is to say, the n-th sent message should be the n-th received message. \paragraph{Progress} ``If a pair of matching send and receives have been initiated on two processes, then at least one of these two operations will complete, independently of other actions in the system: the send operation will complete, unless the receive is satisfied by another message, and completes; the receive operation will complete, unless the message sent is consumed by another matching receive that was posted at the same destination process.'' \cite{MPI} When one \verb|EP_Irecv| is issued, we first dequeue the MPI incoming message queue and distribute all incoming messages to the local queues according to the destination identifier. Next, the nonblocking receive request is added at the end of the request pending list. Third, the pending list is checked and requests with matching source, tag, and communicator will be accomplished. Because of the importance of message order, some communication completion functions must be discussed here such as \verb|MPI_Test| and \verb|MPI_Wait|. ``The functions \verb|MPI_Wait| and \verb|MPI_Test| are used to complete a nonblocking communication. The completion of a send operation indicates that the sender is now free to update the locations in the send buffer (the send operation itself leaves the content of the send buffer unchanged). It does not indicate that the message has been received, rather, it may have been buffered by the communication subsystem. However, if a synchronous mode send was used, the completion of the send operation indicates that a matching receive was initiated, and that the message will eventually be received by this matching receive. The completion of a receive operation indicates that the receive buffer contains the received message, the receiver is now free to access it, and that the status object is set. It does not indicate that the matching send operation has completed (but indicates, of course, that the send was initiated).'' \cite{MPI} \paragraph{Example 1} \verb|MPI_Test(MPI_Request *request, int *flag, MPI_Status *status)| \begin{itemize} \item[1.] If \verb|request->type == 1|, communication to be tested is indeed issued from a non-blocking send. The completion status is returned by: \begin{center} \begin{verbatim} MPI_Test(& request->mpi_request, flag, & status->mpi_status) \end{verbatim} \end{center} \item[2.] If \verb|request->type == 2|, it means that a non-blocking receive is called but the corresponding message is not yet probed. The request is in the pending list thus not yet completed. All incoming message is once again probed and all pending requests are checked. If after the second check, the matching message is found, thus a \verb|MPI_Imrecv| is called and the type is set to 3. Otherwise, the type is still 2, then \verb|flag = false| is returned. \item[3.] If \verb|request->type == 3|, this indcates that the request is issued from a non-blocking receive call and the matching message is probed thus the status of the communication lies in the status of the \verb|MPI_Imrecv| function. The completion result is returned by: \begin{center} \begin{verbatim} MPI_Test(& request->mpi_request, flag, & status->mpi_status) \end{verbatim} \end{center} \end{itemize} \paragraph{Example 2} \verb|MPI_Wait(MPI_Request *request, MPI_Status *status)| \begin{itemize} \item[1.] If \verb|request->type == 1|, communication to be tested is indeed issued from a non-blocking send. Jump to step 4. \item[2.] If \verb|request->type == 2|, it means that a non-blocking receive is called but the corresponding message is not yet probed. The request is in the pending list thus not yet completed. We repeat the incoming message probing and the pending request checking until the matching message is found, thus a \verb|MPI_Imrecv| is called and the type is set to 3. Jump to step 4. \item[3.] If \verb|request->type == 3|, this indcates that the request is issued from a non-blocking receive call and the matching message is probed thus the status of the communication lies in the status of the \verb|MPI_Imrecv| function. Jump to step 4. \item[4.] We force the completion by calling: \begin{center} \begin{verbatim} MPI_Wat(& request->mpi_request, & status->mpi_status) \end{verbatim} \end{center} \end{itemize} \section{Collective communication} All MPI classic collective communications are performed as the following pattern: \begin{itemize} \item[1.] Intra-process communication using OpenMP. \textit{e.g.} Collect data from slave threads to master thread. \item[2.] Inter-process communication using MPI collective calls on master threads. \item[3.] Intra-process communication using OpenMP. \textit{e.g.} Distribute data from master thread to slave threads. \end{itemize} \paragraph{Example 1} \verb|EP_Bcast(buffer, count, datatype, root = 4, comm)| with \verb|comm| composed by 4 MPI processes and 3 threads per process: We can consider the communicator as $\{\underbrace{(0,1,2)}_\textrm{proc 0} \quad \underbrace{(3,\textcolor{red}{4},5)}_\textrm{proc 1}\quad \underbrace{(6,7,8)}_\textrm{proc 2}\quad \underbrace{(9,10,11)}_\textrm{proc 3}\}$. This collective communication is performed by the following three steps: \begin{itemize} \item[1.] EP process with rank 4 send the buffer to EP process rank 3 which is a master thread. \item[2.] We call \verb|MPI_Bcast(buffer, count, datatype, mpi_root = 1, mpi_comm) |. \item[3.] All master threads send the buffer to its slaves. \end{itemize} \begin{center} \includegraphics[scale=0.3]{bcast.png} \end{center} \paragraph{Example 2} \verb|EP_Allreduce(sendbuf, recvbuf, count, datatype, op, comm)| with \verb|comm| the same as in example 1. This collective communication is performed by the following three steps: \begin{itemize} \item[1.] We perform a intra-process ``allreduce'' operation: master threads collect data from its slaves and perform the reduce operation. \item[2.] Master threads call the classic \verb|MPI_Allreduce| routine. \item[3.] All master threads send the updated reduced data to its slaves. \end{itemize} \begin{center} \includegraphics[scale=0.3]{allreduce.png} \end{center} Other collective communications have the similar execution pattern. \section{Inter-communicator} In XIOS, inter-communicator is an very important component. Thus, our EP library must support inter-communications. \subsection{The splitting of intra-communicator} Before talking about the inter-communicator, we will start by splitting intra-communicator. The C prototype of the splitting routine is \begin{center} \begin{verbatim} int MPI_Comm_split(MPI_Comm comm, int color, int key, MPI_Comm *newcomm) \end{verbatim} \end{center} ``This function partitions the group associated with \verb|comm| into disjoint subgroups, one for each value of \verb|color|. Each subgroup contains all processes of the same color. Within each subgroup, the processes are ranked in the order defined by the value of the argument \verb|key|, with ties broken according to their rank in the old group. A new communicator is created for each subgroup and returned in \verb|newcomm|. A process may supply the color value \verb|MPI_UNDEFINED|, in which case \verb|newcomm| returns \verb|MPI_COMM_NULL|. This is a collective call, but each process is permitted to provide different values for color and key.''\cite{MPI} By definition of the routine, in the case of EP, each thread participating the split operation will have only one color (\verb|MPI_UNDEFINED| is also considered to be one color). However, in the process's point of view, it can have multiple colors as shown in the following figure. \begin{center} \includegraphics[scale=0.4]{split.png} \end{center} This figure shows the result of the EP communicator splitting. Here we used the EP rank as key to assign the new rank of the thread in the resulting split intra-communicator. If the key is anything else than the EP rank, we follow the convention that the key takes effect only inside a process. This means that the threads are at first ordered by the MPI process rank and then by the value of key. Due to the fact that one process can have multiple colors for its threads, the splitting operation is executed by the following steps: \begin{itemize} \item[1.] Master threads collect all colors from its slaves and communicate with each other to determine the total number of colors across the communicator. \item[2.] For each color, the master thread check all its slave threads to obtain the number of threads having the same color. \item[3.] If at least one of the slave threads holds the color, then the master thread takes this color. If not, the master thread takes color \verb|MPI_UNDEFINED|. All master threads call classic communicator splitting routine with key $=$ MPI rank. \item[4.] For master threads holding a defined color, we execute the endpoint creation routine according to the number of slave threads holding the same color. The resulting EP communicators are then assigned to these slave threads. \end{itemize} \begin{center} \includegraphics[scale=0.4]{split2.png} \end{center} \subsection{The creation of inter-communicator} In XIOS, the inter-communicators are create by the routine \verb|MPI_Intercomm_create| which is used to bind two intra-communicators into an inter-communicator. The C prototype is \begin{center} \begin{verbatim} int MPI_Intercomm_create(MPI_Comm local_comm, int local_leader, MPI_Comm peer_comm, int remote_leader, int tag, MPI_Comm *newintercomm) \end{verbatim} \end{center} According to the MPI standard, ``an inter-communication is a point-to-point communication between processes in different groups''. ``All inter-communicator constructors are blocking except for \verb|MPI_COMM_IDUP| and require that the local and remote groups be disjoint.'' As in EP the threads are considered as processes, the non-overlapping condition can be translated to ``non-overlapping'' at the thread level which means that one thread can not belong to the local group and the remote group. However, the parent process of the thread can be overlapped. As the EP library is built upon an existing MPI implementation which follows the non-overlapping condition at the process level, we can have an issue in the case. Before digging into this issue, we shall at first look at the case where the non-overlapping condition is perfectly respected. \begin{center} \includegraphics[scale=0.3]{intercomm.png} \end{center} As shown in the figure, we have two intra-communicators A and B and they are totally disjoint both at the thread and process level. Each of the communicators has a local leader. We also assume that both leaders belong to a peer communicator and have rank 4 and 9 respectively. To create the inter-communicator, all threads from the left intra-comm call: \begin{verbatim} MPI_Intercomm_create(commA, local_leader = 2, peer_comm, remote_leader = 9, tag, inter_comm) \end{verbatim} and for threads of the right intra-comm, they call: \begin{verbatim} MPI_Intercomm_create(commB, local_leader = 3, peer_comm, remote_leader = 4, tag, inter_comm) \end{verbatim} To perform the inter-communicator creation, we follow the 3 steps: \begin{itemize} \item[1.] Determine the leaders and ranks at the process level; \item[2.] Call classic \verb|MPI_Intercomm_create|; \item[3.] Create endpoints from process and assigned to threads. \end{itemize} \begin{center} \includegraphics[scale=0.25]{intercomm_step.png} \end{center} If we have overlapped process in the creation of inter-communicator, we should add an \textit{priority check} to assign the process to only one intra-communicator. Several possibilities: \begin{itemize} \item[1.] Process is shared and contains no local leader $\implies$ process belongs to group with higher rank in peer comm; \item[2.] Process is shared and contains one local leader $\implies$ process belongs to group with the leader; \item[3.] Process is shared and contains both local leaders : leader change is performed and the peer communicator is \verb|MPI_COMM_WORLD| and we note ``group A'' the group with smaller peer rank and ``group B'' the group with higher peer rank. \begin{itemize} \item[3a.] If group A has at least two processes, the leader of group A is changed to the master thread of the process with smallest rank except the overlapped process. The overlapped process belongs to group B. \item[3b.] If group A has only one processes, and group B has at least two processes, then the leader of group B is changed to the master thread of the process with smallest rank except the overlapped process. The overlapped process belongs to group A. \item[3c.] If both group A and group B have only one process, then an one-process intra-communicator is created though it will be considered (labeled) as an inter-communicator. \end{itemize} \end{itemize} \begin{center} \includegraphics[scale=0.25]{intercomm2.png} \end{center} \subsection{The merge of inter-communicators} \verb|MPI_Intercomm_Merge(MPI_Comm intercomm, int high, MPI_Comm *newintracomm)| creates an intra-communicator by merging the local and remote groups of an inter-communicator. All processes should provide the same \verb|high| value within each of the two groups. If processes in one group provided the value \verb|high=false| and processes in the other group provided the value \verb|high=true| then the union orders the “low” group before the “high” group. If all processes provided the same high argument then the order of the union is arbitrary. This call is blocking and collective within the union of the two groups. \cite{MPI} This routine can be considered as the inverse of \verb|MPI_Intercomm_create|. In the intercommunicator create function, all 5 cases are eventually transformed into the case where no MPI process is shared by two groups. It is from this case that the merge funtion takes place. \begin{itemize} \item[1.] The classic \verb|MPI_Intercomm_merge| is called and an MPI intracommunicator is created from the two disjoint groups and MPI processes are ordered by the high value of the local leader. \item[2.] Endpoints are created based on the MPI intracommunicator and the new EP ranks are orderd firstly according to the high value of each thread and then to the origianl EP ranks in the intercommunicators. \end{itemize} \begin{center} \includegraphics[scale=0.25]{merge.png} \end{center} \section{P2P communication on inter-communicators} In case of the intercommunicators, the \verb|MPI_Comm| class has 3 members to determine the topology along with the original \verb|rank_map|: \begin{itemize} \item \verb|RANK_MAP local_rank_map[size of commA]|: composed of the EP rank in commA' or commB'; \item \verb|RANK_MAP remote_rank_map[size of commB]|: = \verb|local_rank_map| of remote group; \item \verb|RANK_MAP intercomm_rank_map[size of commB']|: = \verb|rank_map| of remote group'; \item \verb|RANK_MAP rank_map|: rank map of commA' or commB'. \end{itemize} For example, in the following configuration: \begin{center} \includegraphics[scale = 0.3]{ranks.png} \end{center} For all endpoints in commA, \begin{verbatim} local_rank_map={(rank in commA' or commB', rank of leader in MPI_Comm_world)} ={(1,0), (0,1), (2,1), (4,1)} remote_rank_map={(remote endpoints' rank in commA' or commB', rank of remote leader in MPI_Comm_world)} ={(0,0), (1,1), (3,1), (5,1)} \end{verbatim} For all endpoints in commA' \begin{verbatim} intercomm_rank_map={(remote endpoints local rank in commA' or commB', remote endpoints MPI rank in commA' or commB')} ={(0,0), (1,0)} rank_map={(local rank in commA', mpi rank in commA')} ={(0,0), (1,0), (0,1), (1,1), (0,2), (1,2)} \end{verbatim} For all endpoints in comm B, \begin{verbatim} local_rank_map={(rank in commA' or commB', rank of leader in MPI_Comm_world)} ={(0,0), (1,1), (3,1), (5,1)} remote_rank_map={(remote endpoints' rank in commA' or commB', rank of remote leader in MPI_Comm_world)} ={(1,0), (0,1), (2,1), (4,1)} \end{verbatim} For all endpoints in commB' \begin{verbatim} intercomm_rank_map={(remote endpoints local rank in commA' or commB', remote endpoints MPI rank in commA' or commB')} ={(0,0), (1,0), (0,1), (1,1), (0,2), (1,2)} rank_map={(local rank in commB', mpi rank in commB')} ={(0,0), (1,0)} \end{verbatim} When calling a p2p communication on an inter-communicator, we should: \begin{itemize} \item[1.] Determine if the source and the destination endpoints are in a same group by checking the ``labels''. \begin{itemize} \item[$\bullet$] \verb|src_label = local_rank_map->at(src).second| \item[$\bullet$] \verb|dest_label = remote_rank_map->at(dest).second| \end{itemize} \item[2.] If \verb|src_label == dest_label|, then the communication is in fact a intra-communication. The new source rank and destination rank, as well as the local ranks, are deduced by: \begin{verbatim} src_rank = local_rank_map->at(src).first dest_rank = remote_rank_map->at(dest).first src_rank_local = rank_map->at(src_rank).first dest_rank_local = rank_map->at(dest_rank).first \end{verbatim} \item[3.] If \verb|src_label != dest_label|, then the inter-communication is required. The new ranks are obtained by: \begin{verbatim} src_rank = local_rank_map->at(src).first dest_rank = remote_rank_map->at(dest).first src_rank_local = intercomm_rank_map->at(src_rank).first dest_rank_local = rank_map->at(dest_rank).first \end{verbatim} \item[4.] Call MPI P2P function to start the communication. \begin{itemize} \item[$\bullet$] If intra-communication, \verb|mpi_comm = commA'_mpi or commB'_mpi|; \item[$\bullet$] If inter-communication, \verb|mpi_comm = inter_comm_mpi|. \end{itemize} \end{itemize} \begin{center} \includegraphics[scale = 0.3]{sendrecv2.png} \end{center} \section{One-sided communications} The one-sided communication is a type of communcation which involves only one process to specify all communication parameters, both for the sending side and the receiving side \cite[Chapter~11]{MPI}. To extend this type of communication in the context of endpoints, we encounter some limitations. In the current work, the one-sided communication can only be used in the client-server mode which means that RMA(remote memory access) can occur only between a server and a client. The construction of RMA windows is illustrated by the following figure: \begin{center} \includegraphics[scale=0.5]{RMA_schema.pdf} \end{center} \begin{itemize} \item we determin the max number of threads N in the endpoint environment (N=3 in the example); \item on the server side, N windows are declared and asociated with the same memory adress; \item we start a loop : i = 0, ..., N-1 \begin{itemize} \item each endpoint with thread number i declares an RMA window; \item the link between windows on the client side and the i-th window on the server side are created via \verb|MPI_Win_created|; \item if the number of threads on a certain process is less than N, then a \verb|NULL| pointer is used as memory adress. \end{itemize} \end{itemize} With the RMA windows created, we can then perform some communications: \verb|MPI_Put|, \verb|MPI_Get|, \verb|MPI_Accumulate|, \verb|MPI_Get_accumulate|, \verb|MPI_Fetch_and_op|, \verb|MPI_Compare_and_swap|, \textit{etc}. The main idea of any of the mentioned communications is to identify the threads which are involved in the connection. For example, we want to perform a put operation from EP 2 to the server. We know that EP 2 is the thread 0 of process 1. Thus the 0-th window (win A) of the server side should be used. Once the sender and the receiver are identified, the \verb|MPI_Put| communication can be established. Other RMA functions, such as \verb|MPI_Win_allocate|, \verb|MPI_Fence|, and \verb|| \bibliographystyle{plain} \bibliography{reference} \end{document}