source: XIOS/dev/dev_trunk_omp/extern/src_ep_dev/ep_message.cpp @ 1733

Last change on this file since 1733 was 1646, checked in by yushan, 5 years ago

branch merged with trunk @1645. arch file (ep&mpi) added for ADA

File size: 6.7 KB
Line 
1#ifdef _usingEP
2/*!
3   \file ep_message.cpp
4   \since 2 may 2016
5
6   \brief Definitions of MPI endpoint function: Message_Check
7 */
8
9#include "ep_lib.hpp"
10#include <mpi.h>
11#include "ep_declaration.hpp"
12#include "ep_mpi.hpp"
13
14using namespace std;
15
16extern std::list< ep_lib::MPI_Request* > * EP_PendingRequests;
17#pragma omp threadprivate(EP_PendingRequests)
18
19namespace ep_lib
20{
21  int Request_Check()
22  {
23    if(EP_PendingRequests == 0 ) EP_PendingRequests = new std::list< MPI_Request* >;
24   
25    if(EP_PendingRequests->size() == 0) return 0;
26   
27    show_EP_PendingRequests(EP_PendingRequests);
28   
29    MPI_Status status;
30    MPI_Message *message;
31    int probed = false;
32    int recv_count = 0;
33    std::list<MPI_Request* >::iterator it;
34   
35    show_EP_PendingRequests(EP_PendingRequests);
36   
37   
38    for(it = EP_PendingRequests->begin(); it!=EP_PendingRequests->end(); it++)
39    { 
40      if(*(*it) == 0)
41      {
42        EP_PendingRequests->erase(it);
43       
44        memcheck("EP_PendingRequests["<<ep_rank<<"]->size() = " << EP_PendingRequests->size());
45        it = EP_PendingRequests->begin();
46        continue;
47      }
48     
49      if((*(*it))->state == 2)
50      {
51        EP_PendingRequests->erase(it);
52       
53        memcheck("EP_PendingRequests["<<ep_rank<<"]->size() = " << EP_PendingRequests->size());
54        it = EP_PendingRequests->begin();
55        continue;
56      }
57     
58      Message_Check(((*(*it))->comm));
59    }
60   
61   
62
63
64    for(it = EP_PendingRequests->begin(); it!=EP_PendingRequests->end(); )
65    {
66      if(*(*it) == 0)
67      {
68        EP_PendingRequests->erase(it);
69       
70        memcheck("EP_PendingRequests["<<ep_rank<<"]->size() = " << EP_PendingRequests->size());
71        it = EP_PendingRequests->begin();
72        continue;
73      }
74     
75      if((*(*it))->state == 2)
76      {
77        EP_PendingRequests->erase(it);
78       
79        memcheck("EP_PendingRequests["<<ep_rank<<"]->size() = " << EP_PendingRequests->size());
80        it = EP_PendingRequests->begin();
81        continue;
82      }
83     
84      if((*(*it))->probed == false)
85      {
86        #pragma omp critical (_query0)
87        {
88          MPI_Iprobe_endpoint((*(*it))->ep_src, (*(*it))->ep_tag, ((*(*it))->comm), &probed, &status);
89          if(probed)
90          {
91            message = new MPI_Message;
92            *message = new ep_message;
93       
94            memcheck("new "<< message <<" : in ep_lib::Request_Check, message = new MPI_Message");
95            memcheck("new "<< *message <<" : in ep_lib::Request_Check, *message = new ep_message");
96         
97         
98            MPI_Improbe_endpoint((*(*it))->ep_src, (*(*it))->ep_tag, (*(*it))->comm, &probed, message, &status);
99       
100          }
101        }
102     
103       
104        if(probed)
105        {
106          MPI_Get_count(&status, (*(*it))->ep_datatype, &recv_count);
107         
108          MPI_Imrecv((*(*it))->buf, recv_count, (*(*it))->ep_datatype, message, *it);
109          (*(*it))->type = 3;
110          (*(*it))->probed = true;
111
112          memcheck("delete "<< status.mpi_status <<" : in ep_lib::Request_Check, delete status.mpi_status");
113          delete status.mpi_status;         
114
115          memcheck("delete "<< *message <<" : in ep_lib::Request_Check, delete *message");
116          memcheck("delete "<< message <<" : in ep_lib::Request_Check, delete message");
117
118          delete *message;
119          delete message;
120       
121          it++;
122          continue;     
123        }             
124      }
125     
126      it++;
127    }
128   
129    show_EP_PendingRequests(EP_PendingRequests);
130  }
131 
132 
133 
134  int Message_Check(MPI_Comm comm)
135  {
136    if(comm->is_ep) return Message_Check_endpoint(comm);
137  }
138 
139 
140  int Message_Check_endpoint(MPI_Comm comm)
141  {
142   
143    int flag = true;
144    ::MPI_Status status;
145    ::MPI_Message message;
146
147    while(flag) // loop until the end of global queue
148    {
149      Debug("Message probing for intracomm\n");
150/*     
151      #pragma omp critical (_mpi_call)
152      {
153        ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm->mpi_comm), &flag, &status);
154        if(flag)
155        {
156          Debug("find message in mpi comm \n");
157          ::MPI_Mprobe(status.MPI_SOURCE, status.MPI_TAG, to_mpi_comm(comm->mpi_comm), &message, &status);
158        }
159      }
160
161*/   
162      ::MPI_Improbe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm->mpi_comm), &flag, &message, &status);     
163
164      if(flag)
165      {
166        MPI_Message msg = new ep_message; 
167        msg->mpi_message = new ::MPI_Message(message);
168
169        memcheck("new "<< msg <<" : in ep_lib::Message_Check, msg = new ep_message");
170        memcheck("new "<< msg->mpi_message <<" : in ep_lib::Message_Check, msg->mpi_message = new ::MPI_Message");
171             
172
173        msg->ep_tag  = bitset<15>(status.MPI_TAG >> 16).to_ulong(); 
174        int src_loc  = bitset<8> (status.MPI_TAG >> 8) .to_ulong(); 
175        int dest_loc = bitset<8> (status.MPI_TAG)           .to_ulong();
176        int src_mpi  = status.MPI_SOURCE;
177             
178        msg->ep_src  = get_ep_rank(comm, src_loc,  src_mpi); 
179
180#ifdef _showinfo
181        printf("status.MPI_TAG = %d, src_loc = %d, dest_loc = %d, ep_tag = %d\n", status.MPI_TAG, src_loc, dest_loc, msg->ep_tag);
182#endif
183
184        msg->mpi_status = new ::MPI_Status(status); 
185        memcheck("new "<< msg->mpi_status <<" : in ep_lib::Message_Check, msg->mpi_status = new ::MPI_Status");
186
187        #pragma omp critical (_query)
188        {
189          #pragma omp flush
190          comm->ep_comm_ptr->comm_list[dest_loc]->ep_comm_ptr->message_queue->push_back(msg); 
191          int dest_mpi = comm->ep_comm_ptr->size_rank_info[2].first;
192          memcheck("message_queue["<<dest_mpi<<","<<dest_loc<<"]->size = "<<comm->ep_comm_ptr->comm_list[dest_loc]->ep_comm_ptr->message_queue->size());
193          #pragma omp flush
194        }
195      }
196    }
197
198    return MPI_SUCCESS;
199  }
200
201 
202  void show_EP_PendingRequests(std::list< ep_lib::MPI_Request* > * EP_PendingRequest)
203  {
204#ifdef _showinfo
205    std::list<MPI_Request* >::iterator it;
206   
207    int i=0;
208   
209    for(it = EP_PendingRequests->begin(); it!=EP_PendingRequests->end(); it++)
210    { 
211      if(*(*it) == 0)
212      {
213        EP_PendingRequests->erase(it);
214           
215        memcheck("EP_PendingRequests["<<ep_rank<<"]->size() = " << EP_PendingRequests->size());
216        it = EP_PendingRequests->begin();
217        continue;
218      }
219     
220      if((*(*it))->state == 2)
221      {
222        EP_PendingRequests->erase(it);
223       
224        memcheck("EP_PendingRequests["<<ep_rank<<"]->size() = " << EP_PendingRequests->size());
225        it = EP_PendingRequests->begin();
226        continue;
227      }
228         
229      if((*(*it))->ep_src>6)
230        printf("EP_PendingRequests[%d] : ep_src = %d, ep_tag = %d, type = %d, state = %d, comm = %d\n", i, (*(*it))->ep_src, (*(*it))->ep_tag, (*(*it))->type, (*(*it))->state, to_mpi_comm(((*(*it))->comm)->mpi_comm));
231      i++;
232    }
233#endif
234
235  }
236
237 
238
239}
240#endif
Note: See TracBrowser for help on using the repository browser.