source: XIOS/dev/branch_openmp/extern/ep_dev/ep_message.cpp @ 1515

Last change on this file since 1515 was 1515, checked in by yushan, 6 years ago

save dev

File size: 7.3 KB
RevLine 
[1381]1/*!
2   \file ep_message.cpp
3   \since 2 may 2016
4
5   \brief Definitions of MPI endpoint function: Message_Check
6 */
7
8#include "ep_lib.hpp"
9#include <mpi.h>
10#include "ep_declaration.hpp"
11#include "ep_mpi.hpp"
12
13using namespace std;
14
15extern std::list< ep_lib::MPI_Request* > * EP_PendingRequests;
16#pragma omp threadprivate(EP_PendingRequests)
17
18namespace ep_lib
19{
[1500]20  int Request_Check()
21  {
[1503]22    if(EP_PendingRequests == 0 ) EP_PendingRequests = new std::list< MPI_Request* >;
23   
24    if(EP_PendingRequests->size() == 0) return 0;
25   
[1500]26    MPI_Status status;
27    MPI_Message *message;
28    int probed = false;
29    int recv_count = 0;
30    std::list<MPI_Request* >::iterator it;
31   
32   
33    for(it = EP_PendingRequests->begin(); it!=EP_PendingRequests->end(); it++)
34    { 
35      Message_Check(((*(*it))->comm));
36    }
[1381]37
[1500]38
39    for(it = EP_PendingRequests->begin(); it!=EP_PendingRequests->end(); )
40    {
41      if((*(*it))->state == 0)
42      {
[1503]43        #pragma omp critical (_query0)
44        {
45          MPI_Iprobe((*(*it))->ep_src, (*(*it))->ep_tag, ((*(*it))->comm), &probed, &status);
46          if(probed)
47          {
48            message = new MPI_Message;
49            *message = new ep_message;
[1500]50       
[1503]51            memcheck("new "<< message <<" : in ep_lib::Request_Check, message = new MPI_Message");
52            memcheck("new "<< *message <<" : in ep_lib::Request_Check, *message = new ep_message");
[1500]53         
54         
[1503]55            MPI_Improbe((*(*it))->ep_src, (*(*it))->ep_tag, (*(*it))->comm, &probed, message, &status);
56       
57          }
58        }
[1500]59     
60       
61        if(probed)
62        {
63          ::MPI_Get_count(to_mpi_status_ptr(status), to_mpi_type((*(*it))->ep_datatype), &recv_count);
64         
65          MPI_Imrecv((*(*it))->buf, recv_count, (*(*it))->ep_datatype, message, *it);
66          (*(*it))->type = 3;
67          (*(*it))->state = 1;
68
[1503]69          memcheck("delete "<< status.mpi_status <<" : in ep_lib::Request_Check, delete status.mpi_status");
70          delete status.mpi_status;         
[1500]71
[1503]72          memcheck("delete "<< *message <<" : in ep_lib::Request_Check, delete *message");
73          memcheck("delete "<< message <<" : in ep_lib::Request_Check, delete message");
74
[1500]75          delete *message;
76          delete message;
77       
78          it++;
79          continue;     
[1503]80        }             
[1500]81      }
82     
83      if((*(*it))->state == 2)
84      {
[1503]85        int ep_rank = ((*(*it))->comm)->ep_comm_ptr->size_rank_info[0].first;
86        memcheck("delete "<< (*(*it)) <<" : in ep_lib::Request_Check, delete (*(*it))");
[1500]87        delete (*(*it));
88       
89        EP_PendingRequests->erase(it);
[1503]90        memcheck("EP_PendingRequests["<<ep_rank<<"]->size() = " << EP_PendingRequests->size());
[1500]91        it = EP_PendingRequests->begin();
92        continue;
93      }
94      else it++;
95    }
96  }
97 
98 
99 
[1381]100  int Message_Check(MPI_Comm comm)
101  {
[1500]102    if(!comm->is_ep) return MPI_SUCCESS;
[1381]103
[1500]104    if(comm->is_intercomm)
[1381]105    {
[1515]106      Message_Check_intercomm(comm);
[1381]107    }
[1500]108   
109    return Message_Check_intracomm(comm);
[1381]110
[1500]111  }
112 
113 
114  int Message_Check_intracomm(MPI_Comm comm)
115  {
116   
[1381]117    int flag = true;
[1500]118    ::MPI_Status status;
[1381]119    ::MPI_Message message;
120
121    while(flag) // loop until the end of global queue
122    {
123      Debug("Message probing for intracomm\n");
[1500]124     
[1381]125      #pragma omp critical (_mpi_call)
126      {
[1500]127        ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm->mpi_comm), &flag, &status);
[1381]128        if(flag)
129        {
130          Debug("find message in mpi comm \n");
[1500]131          ::MPI_Mprobe(status.MPI_SOURCE, status.MPI_TAG, to_mpi_comm(comm->mpi_comm), &message, &status);
[1381]132        }
133      }
134
135     
136      if(flag)
137      {
[1500]138        MPI_Message msg = new ep_message; 
139        msg->mpi_message = new ::MPI_Message(message);
[1381]140
[1503]141        memcheck("new "<< msg <<" : in ep_lib::Message_Check, msg = new ep_message");
142        memcheck("new "<< msg->mpi_message <<" : in ep_lib::Message_Check, msg->mpi_message = new ::MPI_Message");
[1500]143             
144
145        msg->ep_tag  = bitset<15>(status.MPI_TAG >> 16).to_ulong(); 
146        int src_loc  = bitset<8> (status.MPI_TAG >> 8) .to_ulong(); 
147        int dest_loc = bitset<8> (status.MPI_TAG)           .to_ulong();
148        int src_mpi  = status.MPI_SOURCE;
[1381]149             
[1515]150        msg->ep_src  = get_ep_rank(comm, src_loc,  src_mpi); 
151
152#ifdef _showinfo
153        printf("status.MPI_TAG = %d, src_loc = %d, dest_loc = %d, ep_tag = %d\n", status.MPI_TAG, src_loc, dest_loc, msg->ep_tag);
154#endif
155
[1500]156        msg->mpi_status = new ::MPI_Status(status); 
[1503]157        memcheck("new "<< msg->mpi_status <<" : in ep_lib::Message_Check, msg->mpi_status = new ::MPI_Status");
[1381]158
159        #pragma omp critical (_query)
160        {
161          #pragma omp flush
[1500]162          comm->ep_comm_ptr->comm_list[dest_loc]->ep_comm_ptr->message_queue->push_back(msg); 
[1503]163          int dest_mpi = comm->ep_comm_ptr->size_rank_info[2].first;
164          memcheck("message_queue["<<dest_mpi<<","<<dest_loc<<"]->size = "<<comm->ep_comm_ptr->comm_list[dest_loc]->ep_comm_ptr->message_queue->size());
[1381]165          #pragma omp flush
166        }
167      }
168    }
169
170    return MPI_SUCCESS;
171  }
172
173
[1500]174 
175 
176 
[1381]177
[1500]178
[1381]179  int Message_Check_intercomm(MPI_Comm comm)
180  {
[1500]181    if(!comm->ep_comm_ptr->intercomm->mpi_inter_comm) return 0;
[1381]182
183    Debug("Message probing for intercomm\n");
184
185    int flag = true;
186    ::MPI_Message message;
187    ::MPI_Status status;
188    int current_ep_rank;
189    MPI_Comm_rank(comm, &current_ep_rank);
190
[1500]191    while(flag) // loop until the end of global queue "comm->ep_comm_ptr->intercomm->mpi_inter_comm"
[1381]192    {
193      Debug("Message probing for intracomm\n");
194
195      #pragma omp critical (_mpi_call)
196      {
[1500]197        ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm->ep_comm_ptr->intercomm->mpi_inter_comm), &flag, &status);
[1381]198        if(flag)
199        {
200          Debug("find message in mpi comm \n");
[1500]201          ::MPI_Mprobe(status.MPI_SOURCE, status.MPI_TAG, to_mpi_comm(comm->ep_comm_ptr->intercomm->mpi_inter_comm), &message, &status);
[1381]202        }
203      }
204     
205
206      if(flag)
207      {
208
[1500]209        MPI_Message msg = new ep_message; 
210        msg->mpi_message = new ::MPI_Message(message);
[1381]211
[1503]212        memcheck("new "<< msg <<" : in ep_lib::Message_Check, msg = new ep_message");
213        memcheck("new "<< msg->mpi_message <<" : in ep_lib::Message_Check, msg->mpi_message = new ::MPI_Message");
[1500]214             
[1381]215
[1500]216        msg->ep_tag  = bitset<15>(status.MPI_TAG >> 16).to_ulong(); 
217        int src_loc  = bitset<8> (status.MPI_TAG >> 8) .to_ulong(); 
218        int dest_loc = bitset<8> (status.MPI_TAG)           .to_ulong();
219        int src_mpi  = status.MPI_SOURCE;
220             
[1515]221        msg->ep_src  = get_ep_rank(comm, src_loc,  src_mpi);   
222#ifdef _showinfo
223        printf("status.MPI_TAG = %d, src_loc = %d, dest_loc = %d, ep_tag = %d\n", status.MPI_TAG, src_loc, dest_loc, msg->ep_tag);
224#endif
225
[1503]226        msg->mpi_status = new ::MPI_Status(status); 
227        memcheck("new "<< msg->mpi_status <<" : in ep_lib::Message_Check, msg->mpi_status = new ::MPI_Status");
[1381]228
229        #pragma omp critical (_query)
230        {
231          #pragma omp flush
[1500]232          comm->ep_comm_ptr->comm_list[dest_loc]->ep_comm_ptr->message_queue->push_back(msg);
[1503]233          memcheck("comm->ep_comm_ptr->comm_list["<<dest_loc<<"]->ep_comm_ptr->message_queue->size = "<<comm->ep_comm_ptr->comm_list[dest_loc]->ep_comm_ptr->message_queue->size());
[1381]234          #pragma omp flush
235        }
236      }
237    }
238
[1500]239    Message_Check_intracomm(comm);
[1381]240
241    return MPI_SUCCESS;
242  }
243
[1500]244 
[1381]245
246}
Note: See TracBrowser for help on using the repository browser.