source: XIOS/dev/branch_yushan_merged/extern/src_ep_dev/ep_recv.cpp @ 1187

Last change on this file since 1187 was 1187, checked in by yushan, 7 years ago

add pending list for irecv

File size: 5.3 KB
Line 
1/*!
2   \file ep_recv.cpp
3   \since 2 may 2016
4
5   \brief Definitions of MPI receive functions: MPI_Recv, MPI_Mrecv, MPI_Irecv, MPI_Imrecv
6 */
7
8
9#include "ep_lib.hpp"
10#include <mpi.h>
11#include "ep_declaration.hpp"
12
13using namespace std;
14
15
16namespace ep_lib
17{
18 
19
20  int MPI_Recv(void *buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm, MPI_Status *status)
21  {
22    int dest_rank;
23    MPI_Comm_rank(comm, &dest_rank);
24
25    if(!comm.is_ep)
26    {
27      ::MPI_Comm mpi_comm = static_cast< ::MPI_Comm >(comm.mpi_comm);
28      ::MPI_Status mpi_status;
29      ::MPI_Recv(buf, count, static_cast< ::MPI_Datatype >(datatype), src<0? MPI_ANY_SOURCE : src, tag<0? MPI_ANY_TAG: tag, mpi_comm, &mpi_status);
30
31      status->ep_src = src;
32      status->ep_tag = tag;
33      status->ep_datatype = datatype;
34
35      return 0; 
36    }
37
38    Message_Check(comm);
39
40    MPI_Request request;
41    MPI_Irecv(buf, count, datatype, src, tag, comm, &request);
42    MPI_Wait(&request, status);
43
44    return 0;
45  }
46
47
48
49
50  int MPI_Irecv(void *buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm, MPI_Request *request)
51  {
52
53    Debug("MPI_Irecv with EP");
54    int dest_rank;
55    MPI_Comm_rank(comm, &dest_rank);
56
57    if(!comm.is_ep)
58    {
59      ::MPI_Request mpi_request;
60      ::MPI_Comm mpi_comm = static_cast< ::MPI_Comm > (comm.mpi_comm);
61      ::MPI_Irecv(buf, count, static_cast< ::MPI_Datatype> (datatype), src<0? MPI_ANY_SOURCE : src, tag<0? MPI_ANY_TAG: tag, mpi_comm, &mpi_request);
62
63      request->mpi_request = mpi_request;
64      request->ep_src = src;
65      request->ep_datatype = datatype;
66      request->ep_tag = tag;
67    }
68
69    request->mpi_request = MPI_REQUEST_NULL_STD;
70    request->buf = buf;
71    request->comm = comm;
72    request->type = 2;
73
74    request->ep_src = src;
75    request->ep_tag = tag;
76    request->ep_datatype = datatype;
77
78
79
80    /* With Improbe*/
81    Message_Check(comm);
82
83    if(EP_PendingRequests == 0 ) 
84    {
85      EP_PendingRequests = new std::list< MPI_Request* >; 
86      printf("proc %d : EP_PendingRequests allocated, add = %p\n", dest_rank, EP_PendingRequests); 
87    }
88
89    request->pending_ptr = EP_PendingRequests;
90    printf("proc %d : &EP_PendingRequests add = %p, ptr = %p\n", dest_rank, EP_PendingRequests, request->pending_ptr); 
91   
92    EP_PendingRequests->push_back(request);
93    //printf("proc %d : EP_PendingRequests insert one request, src = %d, tag = %d, size = %d\n", dest_rank, request->ep_src, request->ep_tag, EP_PendingRequests->size());
94   
95    // check all EP_PendingRequests
96   
97    //printf("proc %d have %d pending irecv request\n", dest_rank, EP_PendingRequests->size());
98     
99    for(std::list<MPI_Request* >::iterator it = EP_PendingRequests->begin(); it!=EP_PendingRequests->end(); )
100    {
101      if((*it)->type == 3) 
102      {
103        //printf("proc %d : pending request type = %d, src= %d, tag = %d    skip\n", dest_rank, (*it)->type, (*it)->ep_src, (*it)->ep_tag);
104        EP_PendingRequests->erase(it);
105        it = EP_PendingRequests->begin();
106        //printf("proc %d : pending request processed, size = %d\n", dest_rank, EP_PendingRequests->size());
107        continue;
108      }
109       
110      //printf("proc %d : pending irecv request src = %d, tag = %d, type = %d\n", dest_rank, (*it)->ep_src, (*it)->ep_tag, (*it)->type);
111      int probed = false;
112      MPI_Message pending_message;
113      MPI_Status pending_status;
114   
115      MPI_Improbe((*it)->ep_src, (*it)->ep_tag, (*it)->comm, &probed, &pending_message, &pending_status);
116      //printf("proc %d : pending irecv request probed to be %d, src = %d, tag = %d\n",dest_rank, probed, (*it)->ep_src, (*it)->ep_tag);
117   
118      if(probed) 
119      { 
120        int count;
121        MPI_Get_count(&pending_status, (*it)->ep_datatype, &count);
122        MPI_Imrecv((*it)->buf, count, (*it)->ep_datatype, &pending_message, *it);
123        //printf("proc %d : pending request is imrecving src = %d, tag = %d, count = %d\n", dest_rank, (*it)->ep_src, (*it)->ep_tag, count);
124        EP_PendingRequests->erase(it);
125        it = EP_PendingRequests->begin();
126        //printf("proc %d : pending request processed, size = %d\n", dest_rank, EP_PendingRequests->size());
127        continue;
128      }
129
130      it++;
131    }
132   
133    return 0;
134  }
135
136  int MPI_Imrecv(void *buf, int count, MPI_Datatype datatype, MPI_Message *message, MPI_Request *request)
137  {
138    Debug("MPI_Imrecv");
139
140    request->type = 3;
141
142    ::MPI_Request mpi_request;
143    ::MPI_Message mpi_message = static_cast< ::MPI_Message >(message->mpi_message);
144               
145    ::MPI_Imrecv(buf, count, static_cast< ::MPI_Datatype>(datatype), &mpi_message, &mpi_request);
146
147    request->mpi_request = mpi_request;
148    request->ep_datatype = datatype;
149    request->ep_tag = message->ep_tag;
150    request->ep_src = message->ep_src;
151    //request->buf = buf;
152
153    return 0;
154  }
155
156
157  int MPI_Mrecv(void *buf, int count, MPI_Datatype datatype, MPI_Message *message, MPI_Status *status)
158  {
159    Debug("EP Mrecv called\n");
160
161    ::MPI_Status mpi_status;
162    ::MPI_Message mpi_message = static_cast< ::MPI_Message >(message->mpi_message);
163   
164    ::MPI_Mrecv(buf, count, static_cast< ::MPI_Datatype>(datatype), &mpi_message, &mpi_status);
165
166    status->mpi_status = new ::MPI_Status(mpi_status);
167    status->ep_src = message->ep_src;
168    status->ep_datatype = datatype;
169    status->ep_tag = message->ep_tag;
170
171    //check_sum_recv(buf, count, datatype, message->ep_src, message->ep_tag);
172
173    return 0;
174  }
175
176}
177
178
Note: See TracBrowser for help on using the repository browser.