source: XIOS/dev/branch_openmp/extern/ep_dev/ep_message.cpp @ 1517

Last change on this file since 1517 was 1517, checked in by yushan, 6 years ago

save dev

File size: 7.2 KB
Line 
1/*!
2   \file ep_message.cpp
3   \since 2 may 2016
4
5   \brief Definitions of MPI endpoint function: Message_Check
6 */
7
8#include "ep_lib.hpp"
9#include <mpi.h>
10#include "ep_declaration.hpp"
11#include "ep_mpi.hpp"
12
13using namespace std;
14
15extern std::list< ep_lib::MPI_Request* > * EP_PendingRequests;
16#pragma omp threadprivate(EP_PendingRequests)
17
18namespace ep_lib
19{
20  int Request_Check()
21  {
22    if(EP_PendingRequests == 0 ) EP_PendingRequests = new std::list< MPI_Request* >;
23   
24    if(EP_PendingRequests->size() == 0) return 0;
25   
26    MPI_Status status;
27    MPI_Message *message;
28    int probed = false;
29    int recv_count = 0;
30    std::list<MPI_Request* >::iterator it;
31   
32   
33    for(it = EP_PendingRequests->begin(); it!=EP_PendingRequests->end(); it++)
34    { 
35      Message_Check(((*(*it))->comm));
36    }
37
38
39    for(it = EP_PendingRequests->begin(); it!=EP_PendingRequests->end(); )
40    {
41      if((*(*it))->state == 0)
42      {
43        #pragma omp critical (_query0)
44        {
45          MPI_Iprobe((*(*it))->ep_src, (*(*it))->ep_tag, ((*(*it))->comm), &probed, &status);
46          if(probed)
47          {
48            message = new MPI_Message;
49            *message = new ep_message;
50       
51            memcheck("new "<< message <<" : in ep_lib::Request_Check, message = new MPI_Message");
52            memcheck("new "<< *message <<" : in ep_lib::Request_Check, *message = new ep_message");
53         
54         
55            MPI_Improbe((*(*it))->ep_src, (*(*it))->ep_tag, (*(*it))->comm, &probed, message, &status);
56       
57          }
58        }
59     
60       
61        if(probed)
62        {
63          MPI_Get_count(&status, (*(*it))->ep_datatype, &recv_count);
64         
65          MPI_Imrecv((*(*it))->buf, recv_count, (*(*it))->ep_datatype, message, *it);
66          (*(*it))->type = 3;
67          (*(*it))->state = 1;
68
69          memcheck("delete "<< status.mpi_status <<" : in ep_lib::Request_Check, delete status.mpi_status");
70          delete status.mpi_status;         
71
72          memcheck("delete "<< *message <<" : in ep_lib::Request_Check, delete *message");
73          memcheck("delete "<< message <<" : in ep_lib::Request_Check, delete message");
74
75          delete *message;
76          delete message;
77       
78          it++;
79          continue;     
80        }             
81      }
82     
83      if((*(*it))->state == 2)
84      {
85        int ep_rank = ((*(*it))->comm)->ep_comm_ptr->size_rank_info[0].first;
86        memcheck("delete "<< (*(*it)) <<" : in ep_lib::Request_Check, delete (*(*it))");
87        delete (*(*it));
88       
89        EP_PendingRequests->erase(it);
90        memcheck("EP_PendingRequests["<<ep_rank<<"]->size() = " << EP_PendingRequests->size());
91        it = EP_PendingRequests->begin();
92        continue;
93      }
94      else it++;
95    }
96  }
97 
98 
99 
100  int Message_Check(MPI_Comm comm)
101  {
102    if(!comm->is_ep) return MPI_SUCCESS;
103
104    if(comm->is_intercomm)
105    {
106      Message_Check_intercomm(comm);
107    }
108   
109    return Message_Check_intracomm(comm);
110
111  }
112 
113 
114  int Message_Check_intracomm(MPI_Comm comm)
115  {
116   
117    int flag = true;
118    ::MPI_Status status;
119    ::MPI_Message message;
120
121    while(flag) // loop until the end of global queue
122    {
123      Debug("Message probing for intracomm\n");
124     
125      #pragma omp critical (_mpi_call)
126      {
127        ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm->mpi_comm), &flag, &status);
128        if(flag)
129        {
130          Debug("find message in mpi comm \n");
131          ::MPI_Mprobe(status.MPI_SOURCE, status.MPI_TAG, to_mpi_comm(comm->mpi_comm), &message, &status);
132        }
133      }
134
135     
136      if(flag)
137      {
138        MPI_Message msg = new ep_message; 
139        msg->mpi_message = new ::MPI_Message(message);
140
141        memcheck("new "<< msg <<" : in ep_lib::Message_Check, msg = new ep_message");
142        memcheck("new "<< msg->mpi_message <<" : in ep_lib::Message_Check, msg->mpi_message = new ::MPI_Message");
143             
144
145        msg->ep_tag  = bitset<15>(status.MPI_TAG >> 16).to_ulong(); 
146        int src_loc  = bitset<8> (status.MPI_TAG >> 8) .to_ulong(); 
147        int dest_loc = bitset<8> (status.MPI_TAG)           .to_ulong();
148        int src_mpi  = status.MPI_SOURCE;
149             
150        msg->ep_src  = get_ep_rank(comm, src_loc,  src_mpi); 
151
152#ifdef _showinfo
153        printf("status.MPI_TAG = %d, src_loc = %d, dest_loc = %d, ep_tag = %d\n", status.MPI_TAG, src_loc, dest_loc, msg->ep_tag);
154#endif
155
156        msg->mpi_status = new ::MPI_Status(status); 
157        memcheck("new "<< msg->mpi_status <<" : in ep_lib::Message_Check, msg->mpi_status = new ::MPI_Status");
158
159        #pragma omp critical (_query)
160        {
161          #pragma omp flush
162          comm->ep_comm_ptr->comm_list[dest_loc]->ep_comm_ptr->message_queue->push_back(msg); 
163          int dest_mpi = comm->ep_comm_ptr->size_rank_info[2].first;
164          memcheck("message_queue["<<dest_mpi<<","<<dest_loc<<"]->size = "<<comm->ep_comm_ptr->comm_list[dest_loc]->ep_comm_ptr->message_queue->size());
165          #pragma omp flush
166        }
167      }
168    }
169
170    return MPI_SUCCESS;
171  }
172
173
174 
175 
176 
177
178
179  int Message_Check_intercomm(MPI_Comm comm)
180  {
181    if(!comm->ep_comm_ptr->intercomm->mpi_inter_comm) return 0;
182
183    Debug("Message probing for intercomm\n");
184
185    int flag = true;
186    ::MPI_Message message;
187    ::MPI_Status status;
188    int current_ep_rank;
189    MPI_Comm_rank(comm, &current_ep_rank);
190
191    while(flag) // loop until the end of global queue "comm->ep_comm_ptr->intercomm->mpi_inter_comm"
192    {
193      Debug("Message probing for intracomm\n");
194
195      #pragma omp critical (_mpi_call)
196      {
197        ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm->ep_comm_ptr->intercomm->mpi_inter_comm), &flag, &status);
198        if(flag)
199        {
200          Debug("find message in mpi comm \n");
201          ::MPI_Mprobe(status.MPI_SOURCE, status.MPI_TAG, to_mpi_comm(comm->ep_comm_ptr->intercomm->mpi_inter_comm), &message, &status);
202        }
203      }
204     
205
206      if(flag)
207      {
208
209        MPI_Message msg = new ep_message; 
210        msg->mpi_message = new ::MPI_Message(message);
211
212        memcheck("new "<< msg <<" : in ep_lib::Message_Check, msg = new ep_message");
213        memcheck("new "<< msg->mpi_message <<" : in ep_lib::Message_Check, msg->mpi_message = new ::MPI_Message");
214             
215
216        msg->ep_tag  = bitset<15>(status.MPI_TAG >> 16).to_ulong(); 
217        int src_loc  = bitset<8> (status.MPI_TAG >> 8) .to_ulong(); 
218        int dest_loc = bitset<8> (status.MPI_TAG)           .to_ulong();
219        int src_mpi  = status.MPI_SOURCE;
220             
221        msg->ep_src  = get_ep_rank(comm, src_loc,  src_mpi);   
222#ifdef _showinfo
223        printf("status.MPI_TAG = %d, src_loc = %d, dest_loc = %d, ep_tag = %d\n", status.MPI_TAG, src_loc, dest_loc, msg->ep_tag);
224#endif
225
226        msg->mpi_status = new ::MPI_Status(status); 
227        memcheck("new "<< msg->mpi_status <<" : in ep_lib::Message_Check, msg->mpi_status = new ::MPI_Status");
228
229        #pragma omp critical (_query)
230        {
231          #pragma omp flush
232          comm->ep_comm_ptr->comm_list[dest_loc]->ep_comm_ptr->message_queue->push_back(msg);
233          memcheck("comm->ep_comm_ptr->comm_list["<<dest_loc<<"]->ep_comm_ptr->message_queue->size = "<<comm->ep_comm_ptr->comm_list[dest_loc]->ep_comm_ptr->message_queue->size());
234          #pragma omp flush
235        }
236      }
237    }
238
239    Message_Check_intracomm(comm);
240
241    return MPI_SUCCESS;
242  }
243
244 
245
246}
Note: See TracBrowser for help on using the repository browser.