source: XIOS/dev/branch_yushan_merged/extern/src_ep_dev/ep_recv.cpp @ 1185

Last change on this file since 1185 was 1185, checked in by yushan, 7 years ago

save dev. recv_test OK

File size: 5.5 KB
Line 
1/*!
2   \file ep_recv.cpp
3   \since 2 may 2016
4
5   \brief Definitions of MPI receive functions: MPI_Recv, MPI_Mrecv, MPI_Irecv, MPI_Imrecv
6 */
7
8
9#include "ep_lib.hpp"
10#include <mpi.h>
11#include "ep_declaration.hpp"
12
13using namespace std;
14
15
16namespace ep_lib
17{
18 
19
20  int MPI_Recv(void *buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm, MPI_Status *status)
21  {
22    int dest_rank;
23    MPI_Comm_rank(comm, &dest_rank);
24
25    if(!comm.is_ep)
26    {
27      ::MPI_Comm mpi_comm = static_cast< ::MPI_Comm >(comm.mpi_comm);
28      ::MPI_Status mpi_status;
29      ::MPI_Recv(buf, count, static_cast< ::MPI_Datatype >(datatype), src<0? MPI_ANY_SOURCE : src, tag<0? MPI_ANY_TAG: tag, mpi_comm, &mpi_status);
30
31      status->ep_src = src;
32      status->ep_tag = tag;
33      status->ep_datatype = datatype;
34
35      return 0; 
36    }
37
38    Message_Check(comm);
39
40    MPI_Request request;
41    MPI_Irecv(buf, count, datatype, src, tag, comm, &request);
42    MPI_Wait(&request, status);
43
44    return 0;
45  }
46
47
48
49
50  int MPI_Irecv(void *buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm, MPI_Request *request)
51  {
52
53    Debug("MPI_Irecv with EP");
54    int dest_rank;
55    MPI_Comm_rank(comm, &dest_rank);
56
57    if(!comm.is_ep)
58    {
59      ::MPI_Request mpi_request;
60      ::MPI_Comm mpi_comm = static_cast< ::MPI_Comm > (comm.mpi_comm);
61      ::MPI_Irecv(buf, count, static_cast< ::MPI_Datatype> (datatype), src<0? MPI_ANY_SOURCE : src, tag<0? MPI_ANY_TAG: tag, mpi_comm, &mpi_request);
62
63      request->mpi_request = mpi_request;
64      request->ep_src = src;
65      request->ep_datatype = datatype;
66      request->ep_tag = tag;
67    }
68
69    request->mpi_request = MPI_REQUEST_NULL_STD;
70    request->buf = buf;
71    request->comm = comm;
72    request->type = 2;
73
74    request->ep_src = src;
75    request->ep_tag = tag;
76    request->ep_datatype = datatype;
77
78    /* With Improbe*/
79    Message_Check(comm);
80
81    EP_PendingRequests->push_back(request);
82    printf("proc %d : EP_PendingRequests insert one request, add = %p(%p), buf = %p(%p)\n", dest_rank, EP_PendingRequests->back(), request, buf, request->buf);
83   
84    // check all EP_PendingRequests
85   
86    //if(EP_PendingRequests == 0 ) EP_PendingRequests = new std::list< MPI_Request* >;
87    if(!EP_PendingRequests->empty())
88    {
89      printf("proc %d have %d pending irecv request\n", dest_rank, EP_PendingRequests->size());
90     
91      for(std::list<MPI_Request* >::iterator it = EP_PendingRequests->begin(); it!=EP_PendingRequests->end(); )
92      {
93        if((*it)->type != 2) 
94        {
95          printf("proc %d : pending request type = %d, src= %d, tag = %d, add = %p skip\n", dest_rank, (*it)->type, (*it)->ep_src, (*it)->ep_tag, *it);
96          EP_PendingRequests->erase(it);
97          it = EP_PendingRequests->begin();
98          printf("proc %d : pending request processed, size = %d, it = %p\n", dest_rank, EP_PendingRequests->size(), *it);
99          continue;
100        }
101       
102        printf("proc %d : pending irecv request src = %d, tag = %d, type = %d, add = %p\n", dest_rank, (*it)->ep_src, (*it)->ep_tag, (*it)->type, *it);
103        int probed = false;
104        MPI_Message pending_message;
105        MPI_Status pending_status;
106   
107        MPI_Improbe((*it)->ep_src, (*it)->ep_tag, (*it)->comm, &probed, &pending_message, &pending_status);
108        printf("proc %d : pending irecv request probed to be %d, add = %p\n",dest_rank, probed, *it);
109   
110        if(probed) 
111        { 
112          int count;
113          MPI_Get_count(&pending_status, (*it)->ep_datatype, &count);
114          MPI_Imrecv((*it)->buf, count, (*it)->ep_datatype, &pending_message, *it);
115          printf("proc %d : pending request is imrecving src = %d, tag = %d, add = %p, buf = %p, count = %d\n", dest_rank, (*it)->ep_src, (*it)->ep_tag, *it, (*it)->buf, count);
116          EP_PendingRequests->erase(it);
117          it = EP_PendingRequests->begin();
118          printf("proc %d : pending request processed, size = %d\n", dest_rank, EP_PendingRequests->size());
119          continue;
120        }
121        else it++;
122      }
123    }
124/*
125    int flag = false;
126    MPI_Message message;
127    MPI_Status status;
128   
129    MPI_Improbe(src, tag, comm, &flag, &message, &status);
130           
131    if(flag)
132    {
133      MPI_Imrecv(buf, count, datatype, &message, request);
134      printf("proc %d : found message in local queue, src = %d, tag = %d\n", dest_rank, src, tag);
135    }
136  */ 
137
138    return 0;
139  }
140
141  int MPI_Imrecv(void *buf, int count, MPI_Datatype datatype, MPI_Message *message, MPI_Request *request)
142  {
143    Debug("MPI_Imrecv");
144
145    request->type = 3;
146
147    ::MPI_Request mpi_request;
148    ::MPI_Message mpi_message = static_cast< ::MPI_Message >(message->mpi_message);
149               
150    ::MPI_Imrecv(buf, count, static_cast< ::MPI_Datatype>(datatype), &mpi_message, &mpi_request);
151
152    request->mpi_request = mpi_request;
153    request->ep_datatype = datatype;
154    request->ep_tag = message->ep_tag;
155    request->ep_src = message->ep_src;
156    //request->buf = buf;
157
158    return 0;
159  }
160
161
162  int MPI_Mrecv(void *buf, int count, MPI_Datatype datatype, MPI_Message *message, MPI_Status *status)
163  {
164    Debug("EP Mrecv called\n");
165
166    ::MPI_Status mpi_status;
167    ::MPI_Message mpi_message = static_cast< ::MPI_Message >(message->mpi_message);
168   
169    ::MPI_Mrecv(buf, count, static_cast< ::MPI_Datatype>(datatype), &mpi_message, &mpi_status);
170
171    status->mpi_status = new ::MPI_Status(mpi_status);
172    status->ep_src = message->ep_src;
173    status->ep_datatype = datatype;
174    status->ep_tag = message->ep_tag;
175
176    //check_sum_recv(buf, count, datatype, message->ep_src, message->ep_tag);
177
178    return 0;
179  }
180
181}
182
183
Note: See TracBrowser for help on using the repository browser.