source: XIOS/dev/branch_yushan_merged/extern/src_ep_dev/ep_wait.cpp @ 1187

Last change on this file since 1187 was 1187, checked in by yushan, 7 years ago

add pending list for irecv

File size: 7.9 KB
Line 
1/*!
2  \file ep_wait.cpp
3  \since 2 may 2016
4
5  \brief Definitions of MPI wait function: MPI_Wait, MPI_Waitall, MPI_Waitsome, MPI_Waitany
6  */
7
8#include "ep_lib.hpp"
9#include <mpi.h>
10#include "ep_declaration.hpp"
11
12using namespace std;
13
14
15
16namespace ep_lib
17{       
18
19  int MPI_Wait(MPI_Request *request, MPI_Status *status)
20  {
21
22    if(request->type == 1)  //=>isend
23    {
24      ::MPI_Request mpi_request = static_cast< ::MPI_Request >(request->mpi_request);
25      ::MPI_Status mpi_status;
26      ::MPI_Errhandler_set(MPI_COMM_WORLD_STD, MPI_ERRORS_RETURN);
27      int error_code = ::MPI_Wait(&mpi_request, &mpi_status);
28      if (error_code != MPI_SUCCESS) {
29     
30         char error_string[BUFSIZ];
31         int length_of_error_string, error_class;
32     
33         ::MPI_Error_class(error_code, &error_class);
34         ::MPI_Error_string(error_class, error_string, &length_of_error_string);
35         printf("%s\n", error_string);
36      }
37
38      status->mpi_status = &mpi_status;
39      status->ep_src = request->ep_src;
40      status->ep_tag = request->ep_tag;
41      status->ep_datatype = request->ep_datatype;
42
43      return 0;
44    }
45
46    if(request->type == 3) //=>imrecv
47    {
48      ::MPI_Request mpi_request = static_cast< ::MPI_Request >(request->mpi_request);
49      ::MPI_Status mpi_status;
50      ::MPI_Errhandler_set(MPI_COMM_WORLD_STD, MPI_ERRORS_RETURN);
51      int error_code = ::MPI_Wait(&mpi_request, &mpi_status);
52      if (error_code != MPI_SUCCESS) {
53     
54         char error_string[BUFSIZ];
55         int length_of_error_string, error_class;
56     
57         ::MPI_Error_class(error_code, &error_class);
58         ::MPI_Error_string(error_class, error_string, &length_of_error_string);
59         printf("%s\n", error_string);
60      }
61     
62
63      status->mpi_status = new ::MPI_Status(mpi_status);
64      status->ep_src = request->ep_src;
65      status->ep_tag = request->ep_tag;
66      status->ep_datatype = request->ep_datatype;
67
68      //check_sum_recv(request->buf, count, request->ep_datatype, request->ep_src, request->ep_tag, request->comm, 2);
69      return 0;
70
71    }
72
73    if(request->type == 2) //=>irecv not probed
74    {
75     
76      while(true)
77      {
78        Message_Check(request->comm);
79        // parcours pending list
80        for(std::list<MPI_Request* >::iterator it = (request->pending_ptr)->begin(); it!=(request->pending_ptr)->end(); )
81        {
82          if(*it == request)
83          { 
84            int probed = false;
85            MPI_Message message;
86
87            MPI_Improbe(request->ep_src, request->ep_tag, request->comm, &probed, &message, status);
88                 
89            if(probed)
90            {
91              int recv_count;
92              MPI_Get_count(status, request->ep_datatype, &recv_count);
93              MPI_Mrecv(request->buf, recv_count, request->ep_datatype, &message, status);
94              (request->pending_ptr)->erase(it);
95              //printf("wait  : pending request processed, size = %d\n", (request->pending_ptr)->size());
96              request->type = 3;
97           
98              return 0;
99            }
100
101            it++;
102          }
103          else 
104          {
105            int probed = false;
106            MPI_Message message;
107            MPI_Status status;
108
109            MPI_Improbe((*it)->ep_src, (*it)->ep_tag, (*it)->comm, &probed, &message, &status);
110                 
111            if(probed)
112            {
113              int recv_count;
114              MPI_Get_count(&status, (*it)->ep_datatype, &recv_count);
115              MPI_Imrecv((*it)->buf, recv_count, (*it)->ep_datatype, &message, *it);
116                 
117              (request->pending_ptr)->erase(it);
118                 
119              it = (request->pending_ptr)->begin();
120              //printf("wait  : pending request processed, size = %d\n", (request->pending_ptr)->size());
121            }
122            else it++;
123          }
124        }
125
126      }
127     
128
129    }
130
131    return MPI_SUCCESS;
132
133  }   /*end of mpi_wait*/
134
135
136
137
138
139
140  int MPI_Waitall(int count, MPI_Request *array_of_requests, MPI_Status *array_of_statuses)
141  {
142    //int dest_rank;
143    //MPI_Comm_rank(MPI_COMM_WORLD, &dest_rank);
144    //printf("proc %d enters waitall\n", dest_rank);
145
146    int finished = 0;
147    int finished_index[count];
148
149    for(int i=0; i<count; i++)
150      printf("pending add = %p\n",  array_of_requests[i].pending_ptr);
151
152    //if(EP_PendingRequests == 0) EP_PendingRequests = new std::list< MPI_Request* >; 
153    //printf("pending size = %d, add = %p\n", EP_PendingRequests->size(), EP_PendingRequests);
154
155    for(int i=0; i<count; i++)
156    {
157      finished_index[i] = false;
158    }
159
160    while(finished < count)
161    {
162     
163      for(int i=0; i<count; i++)
164      {
165        if(finished_index[i] == false) // this request has not been tested.
166        {
167          if(array_of_requests[i].type == 1 || array_of_requests[i].type == 3) // isend or imrecv
168          {     
169            //MPI_Wait(&array_of_requests[i], &array_of_statuses[i]);
170            int tested;
171            MPI_Test(&array_of_requests[i], &tested, &array_of_statuses[i]);
172            if(!tested) MPI_Wait(&array_of_requests[i], &array_of_statuses[i]);
173            finished++;
174            finished_index[i] = true;
175          }
176          else // irecv
177          {
178           
179            Message_Check(array_of_requests[i].comm);
180            // parcours pending list
181            for(std::list<MPI_Request* >::iterator it = (array_of_requests[i].pending_ptr)->begin(); it!=(array_of_requests[i].pending_ptr)->end(); )
182            {
183              bool matched = false;
184              for(int j=0; j<count; j++)
185              {
186                if(*it == &array_of_requests[j])
187                { 
188                  int probed = false;
189                  MPI_Message message;
190
191                  MPI_Improbe((*it)->ep_src, (*it)->ep_tag, (*it)->comm, &probed, &message, &array_of_statuses[j]);
192                 
193                  if(probed)
194                  {
195                    int recv_count;
196                    MPI_Get_count(&array_of_statuses[j], array_of_requests[j].ep_datatype, &recv_count);
197                    MPI_Mrecv(array_of_requests[j].buf, recv_count, array_of_requests[j].ep_datatype, &message, &array_of_statuses[j]);
198                    //check_sum_recv(array_of_requests[i].buf, recv_count, array_of_requests[i].ep_datatype, array_of_requests[i].ep_src, array_of_requests[i].ep_tag, array_of_requests[i].comm, 2);
199                    (array_of_requests[i].pending_ptr)->erase(it);
200                    array_of_requests[j].type = 3;
201                    finished++;
202                    finished_index[j] = true;
203                    matched = true;
204                    it = (array_of_requests[i].pending_ptr)->begin();
205                    j=count;
206                    //printf("waitall  : pending request processed, size = %d\n", (*(array_of_requests[i].pending_ptr))->size());
207                    //printf("matched = %d, j=%d, src = %d, tag = %d, probed = %d\n", matched, j, (*it)->ep_src, (*it)->ep_tag, probed);
208                  }
209                }
210
211              }
212
213              if(!matched)
214              {
215                int probed = false;
216                MPI_Message message;
217                MPI_Status status;
218
219                MPI_Improbe((*it)->ep_src, (*it)->ep_tag, (*it)->comm, &probed, &message, &status);
220                 
221                if(probed)
222                {
223                  int recv_count;
224                  MPI_Get_count(&status, (*it)->ep_datatype, &recv_count);
225                  MPI_Imrecv((*it)->buf, recv_count, (*it)->ep_datatype, &message, *it);
226                 
227                  (array_of_requests[i].pending_ptr)->erase(it);
228                 
229                  it = (array_of_requests[i].pending_ptr)->begin();
230                  //printf("waitall  : pending request processed, size = %d\n", (*(array_of_requests[i].pending_ptr))->size());
231                }
232                else it++;
233              }
234            }
235
236          }
237        }
238      }   
239    }
240    //printf("proc %d exits waitall\n", dest_rank);
241    return MPI_SUCCESS;
242  }  /* end of mpi_waitall*/
243
244
245}
Note: See TracBrowser for help on using the repository browser.