source: XIOS/dev/branch_openmp/extern/ep_dev/ep_message.cpp @ 1506

Last change on this file since 1506 was 1503, checked in by yushan, 6 years ago

rank_map is passed from vector to map, in order to have more flexibility in comm_split

File size: 7.1 KB
Line 
1/*!
2   \file ep_message.cpp
3   \since 2 may 2016
4
5   \brief Definitions of MPI endpoint function: Message_Check
6 */
7
8#include "ep_lib.hpp"
9#include <mpi.h>
10#include "ep_declaration.hpp"
11#include "ep_mpi.hpp"
12
13using namespace std;
14
15extern std::list< ep_lib::MPI_Request* > * EP_PendingRequests;
16#pragma omp threadprivate(EP_PendingRequests)
17
18namespace ep_lib
19{
20  int Request_Check()
21  {
22    if(EP_PendingRequests == 0 ) EP_PendingRequests = new std::list< MPI_Request* >;
23   
24    if(EP_PendingRequests->size() == 0) return 0;
25   
26    MPI_Status status;
27    MPI_Message *message;
28    int probed = false;
29    int recv_count = 0;
30    std::list<MPI_Request* >::iterator it;
31   
32   
33    for(it = EP_PendingRequests->begin(); it!=EP_PendingRequests->end(); it++)
34    { 
35      Message_Check(((*(*it))->comm));
36    }
37
38
39    for(it = EP_PendingRequests->begin(); it!=EP_PendingRequests->end(); )
40    {
41      if((*(*it))->state == 0)
42      {
43        #pragma omp critical (_query0)
44        {
45          MPI_Iprobe((*(*it))->ep_src, (*(*it))->ep_tag, ((*(*it))->comm), &probed, &status);
46          if(probed)
47          {
48            message = new MPI_Message;
49            *message = new ep_message;
50       
51            memcheck("new "<< message <<" : in ep_lib::Request_Check, message = new MPI_Message");
52            memcheck("new "<< *message <<" : in ep_lib::Request_Check, *message = new ep_message");
53         
54         
55            MPI_Improbe((*(*it))->ep_src, (*(*it))->ep_tag, (*(*it))->comm, &probed, message, &status);
56       
57          }
58        }
59     
60       
61        if(probed)
62        {
63          ::MPI_Get_count(to_mpi_status_ptr(status), to_mpi_type((*(*it))->ep_datatype), &recv_count);
64         
65          MPI_Imrecv((*(*it))->buf, recv_count, (*(*it))->ep_datatype, message, *it);
66          (*(*it))->type = 3;
67          (*(*it))->state = 1;
68
69          memcheck("delete "<< status.mpi_status <<" : in ep_lib::Request_Check, delete status.mpi_status");
70          delete status.mpi_status;         
71
72          memcheck("delete "<< *message <<" : in ep_lib::Request_Check, delete *message");
73          memcheck("delete "<< message <<" : in ep_lib::Request_Check, delete message");
74
75          delete *message;
76          delete message;
77       
78          it++;
79          continue;     
80        }             
81      }
82     
83      if((*(*it))->state == 2)
84      {
85        int ep_rank = ((*(*it))->comm)->ep_comm_ptr->size_rank_info[0].first;
86        memcheck("delete "<< (*(*it)) <<" : in ep_lib::Request_Check, delete (*(*it))");
87        delete (*(*it));
88       
89        EP_PendingRequests->erase(it);
90        memcheck("EP_PendingRequests["<<ep_rank<<"]->size() = " << EP_PendingRequests->size());
91        it = EP_PendingRequests->begin();
92        continue;
93      }
94      else it++;
95    }
96  }
97 
98 
99 
100  int Message_Check(MPI_Comm comm)
101  {
102    if(!comm->is_ep) return MPI_SUCCESS;
103
104    if(comm->is_intercomm)
105    {
106      return  Message_Check_intercomm(comm);
107    }
108   
109    return Message_Check_intracomm(comm);
110
111  }
112 
113 
114  int Message_Check_intracomm(MPI_Comm comm)
115  {
116   
117    int flag = true;
118    ::MPI_Status status;
119    ::MPI_Message message;
120
121    while(flag) // loop until the end of global queue
122    {
123      Debug("Message probing for intracomm\n");
124     
125      #pragma omp critical (_mpi_call)
126      {
127        ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm->mpi_comm), &flag, &status);
128        if(flag)
129        {
130          Debug("find message in mpi comm \n");
131          ::MPI_Mprobe(status.MPI_SOURCE, status.MPI_TAG, to_mpi_comm(comm->mpi_comm), &message, &status);
132        }
133      }
134
135     
136      if(flag)
137      {
138        MPI_Message msg = new ep_message; 
139        msg->mpi_message = new ::MPI_Message(message);
140
141        memcheck("new "<< msg <<" : in ep_lib::Message_Check, msg = new ep_message");
142        memcheck("new "<< msg->mpi_message <<" : in ep_lib::Message_Check, msg->mpi_message = new ::MPI_Message");
143             
144
145        msg->ep_tag  = bitset<15>(status.MPI_TAG >> 16).to_ulong(); 
146        int src_loc  = bitset<8> (status.MPI_TAG >> 8) .to_ulong(); 
147        int dest_loc = bitset<8> (status.MPI_TAG)           .to_ulong();
148        int src_mpi  = status.MPI_SOURCE;
149             
150        msg->ep_src  = get_ep_rank(comm, src_loc,  src_mpi);       
151        msg->mpi_status = new ::MPI_Status(status); 
152        memcheck("new "<< msg->mpi_status <<" : in ep_lib::Message_Check, msg->mpi_status = new ::MPI_Status");
153
154        #pragma omp critical (_query)
155        {
156          #pragma omp flush
157          comm->ep_comm_ptr->comm_list[dest_loc]->ep_comm_ptr->message_queue->push_back(msg); 
158          int dest_mpi = comm->ep_comm_ptr->size_rank_info[2].first;
159          memcheck("message_queue["<<dest_mpi<<","<<dest_loc<<"]->size = "<<comm->ep_comm_ptr->comm_list[dest_loc]->ep_comm_ptr->message_queue->size());
160          #pragma omp flush
161        }
162      }
163    }
164
165    return MPI_SUCCESS;
166  }
167
168
169 
170 
171 
172
173
174  int Message_Check_intercomm(MPI_Comm comm)
175  {
176    if(!comm->ep_comm_ptr->intercomm->mpi_inter_comm) return 0;
177
178    Debug("Message probing for intercomm\n");
179
180    int flag = true;
181    ::MPI_Message message;
182    ::MPI_Status status;
183    int current_ep_rank;
184    MPI_Comm_rank(comm, &current_ep_rank);
185
186    while(flag) // loop until the end of global queue "comm->ep_comm_ptr->intercomm->mpi_inter_comm"
187    {
188      Debug("Message probing for intracomm\n");
189
190      #pragma omp critical (_mpi_call)
191      {
192        ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm->ep_comm_ptr->intercomm->mpi_inter_comm), &flag, &status);
193        if(flag)
194        {
195          Debug("find message in mpi comm \n");
196          ::MPI_Mprobe(status.MPI_SOURCE, status.MPI_TAG, to_mpi_comm(comm->ep_comm_ptr->intercomm->mpi_inter_comm), &message, &status);
197        }
198      }
199     
200
201      if(flag)
202      {
203
204        MPI_Message msg = new ep_message; 
205        msg->mpi_message = new ::MPI_Message(message);
206
207        memcheck("new "<< msg <<" : in ep_lib::Message_Check, msg = new ep_message");
208        memcheck("new "<< msg->mpi_message <<" : in ep_lib::Message_Check, msg->mpi_message = new ::MPI_Message");
209             
210
211        msg->ep_tag  = bitset<15>(status.MPI_TAG >> 16).to_ulong(); 
212        int src_loc  = bitset<8> (status.MPI_TAG >> 8) .to_ulong(); 
213        int dest_loc = bitset<8> (status.MPI_TAG)           .to_ulong();
214        int src_mpi  = status.MPI_SOURCE;
215        int current_inter = comm->ep_comm_ptr->intercomm->local_rank_map->at(current_ep_rank).first;
216             
217        msg->ep_src  = get_ep_rank(comm, src_loc,  src_mpi);       
218        msg->mpi_status = new ::MPI_Status(status); 
219        memcheck("new "<< msg->mpi_status <<" : in ep_lib::Message_Check, msg->mpi_status = new ::MPI_Status");
220
221        #pragma omp critical (_query)
222        {
223          #pragma omp flush
224          comm->ep_comm_ptr->comm_list[dest_loc]->ep_comm_ptr->message_queue->push_back(msg);
225          memcheck("comm->ep_comm_ptr->comm_list["<<dest_loc<<"]->ep_comm_ptr->message_queue->size = "<<comm->ep_comm_ptr->comm_list[dest_loc]->ep_comm_ptr->message_queue->size());
226          #pragma omp flush
227        }
228      }
229    }
230
231    Message_Check_intracomm(comm);
232
233    return MPI_SUCCESS;
234  }
235
236 
237
238}
Note: See TracBrowser for help on using the repository browser.