source: XIOS/dev/branch_openmp/extern/src_ep_dev/ep_message.cpp @ 1520

Last change on this file since 1520 was 1520, checked in by yushan, 6 years ago

save dev. TO DO : test with xios

File size: 7.4 KB
RevLine 
[1134]1/*!
2   \file ep_message.cpp
3   \since 2 may 2016
4
5   \brief Definitions of MPI endpoint function: Message_Check
6 */
7
8#include "ep_lib.hpp"
9#include <mpi.h>
10#include "ep_declaration.hpp"
[1295]11#include "ep_mpi.hpp"
[1134]12
13using namespace std;
14
[1196]15extern std::list< ep_lib::MPI_Request* > * EP_PendingRequests;
16#pragma omp threadprivate(EP_PendingRequests)
17
[1134]18namespace ep_lib
19{
[1520]20  int Request_Check()
21  {
22    if(EP_PendingRequests == 0 ) EP_PendingRequests = new std::list< MPI_Request* >;
23   
24    if(EP_PendingRequests->size() == 0) return 0;
25   
26    MPI_Status status;
27    MPI_Message *message;
28    int probed = false;
29    int recv_count = 0;
30    std::list<MPI_Request* >::iterator it;
31   
32   
33    for(it = EP_PendingRequests->begin(); it!=EP_PendingRequests->end(); it++)
34    { 
35      Message_Check(((*(*it))->comm));
36    }
[1134]37
[1520]38
39    for(it = EP_PendingRequests->begin(); it!=EP_PendingRequests->end(); )
40    {
41      if((*(*it))->state == 0)
42      {
43        #pragma omp critical (_query0)
44        {
45          MPI_Iprobe((*(*it))->ep_src, (*(*it))->ep_tag, ((*(*it))->comm), &probed, &status);
46          if(probed)
47          {
48            message = new MPI_Message;
49            *message = new ep_message;
50       
51            memcheck("new "<< message <<" : in ep_lib::Request_Check, message = new MPI_Message");
52            memcheck("new "<< *message <<" : in ep_lib::Request_Check, *message = new ep_message");
53         
54         
55            MPI_Improbe((*(*it))->ep_src, (*(*it))->ep_tag, (*(*it))->comm, &probed, message, &status);
56       
57          }
58        }
59     
60       
61        if(probed)
62        {
63          MPI_Get_count(&status, (*(*it))->ep_datatype, &recv_count);
64         
65          MPI_Imrecv((*(*it))->buf, recv_count, (*(*it))->ep_datatype, message, *it);
66          (*(*it))->type = 3;
67          (*(*it))->state = 1;
68
69          memcheck("delete "<< status.mpi_status <<" : in ep_lib::Request_Check, delete status.mpi_status");
70          delete status.mpi_status;         
71
72          memcheck("delete "<< *message <<" : in ep_lib::Request_Check, delete *message");
73          memcheck("delete "<< message <<" : in ep_lib::Request_Check, delete message");
74
75          delete *message;
76          delete message;
77       
78          it++;
79          continue;     
80        }             
81      }
82     
83      if((*(*it))->state == 2)
84      {
85        int ep_rank = ((*(*it))->comm)->ep_comm_ptr->size_rank_info[0].first;
86        memcheck("delete "<< (*(*it)) <<" : in ep_lib::Request_Check, delete (*(*it))");
87       
88       
89        int world_rank;
90        MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
91        if(world_rank==2) 
92        {
93          printf("ep %d erased one pending request %p\n", world_rank,*(*it));
94        }
95       
96        EP_PendingRequests->erase(it);
97       
98        memcheck("EP_PendingRequests["<<ep_rank<<"]->size() = " << EP_PendingRequests->size());
99        it = EP_PendingRequests->begin();
100        continue;
101      }
102      else it++;
103    }
104  }
105 
106 
107 
[1134]108  int Message_Check(MPI_Comm comm)
109  {
[1520]110    if(!comm->is_ep) return MPI_SUCCESS;
[1134]111
[1520]112    if(comm->is_intercomm)
[1134]113    {
[1520]114      Message_Check_intercomm(comm);
[1134]115    }
[1520]116   
117    return Message_Check_intracomm(comm);
[1134]118
[1520]119  }
120 
121 
122  int Message_Check_intracomm(MPI_Comm comm)
123  {
124   
[1134]125    int flag = true;
[1520]126    ::MPI_Status status;
[1134]127    ::MPI_Message message;
128
129    while(flag) // loop until the end of global queue
130    {
131      Debug("Message probing for intracomm\n");
[1520]132     
[1134]133      #pragma omp critical (_mpi_call)
134      {
[1520]135        ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm->mpi_comm), &flag, &status);
[1134]136        if(flag)
137        {
138          Debug("find message in mpi comm \n");
[1520]139          ::MPI_Mprobe(status.MPI_SOURCE, status.MPI_TAG, to_mpi_comm(comm->mpi_comm), &message, &status);
[1134]140        }
141      }
[1374]142
[1176]143     
[1134]144      if(flag)
145      {
[1520]146        MPI_Message msg = new ep_message; 
147        msg->mpi_message = new ::MPI_Message(message);
[1134]148
[1520]149        memcheck("new "<< msg <<" : in ep_lib::Message_Check, msg = new ep_message");
150        memcheck("new "<< msg->mpi_message <<" : in ep_lib::Message_Check, msg->mpi_message = new ::MPI_Message");
151             
152
153        msg->ep_tag  = bitset<15>(status.MPI_TAG >> 16).to_ulong(); 
154        int src_loc  = bitset<8> (status.MPI_TAG >> 8) .to_ulong(); 
155        int dest_loc = bitset<8> (status.MPI_TAG)           .to_ulong();
156        int src_mpi  = status.MPI_SOURCE;
[1134]157             
[1520]158        msg->ep_src  = get_ep_rank(comm, src_loc,  src_mpi); 
[1134]159
[1520]160#ifdef _showinfo
161        printf("status.MPI_TAG = %d, src_loc = %d, dest_loc = %d, ep_tag = %d\n", status.MPI_TAG, src_loc, dest_loc, msg->ep_tag);
162#endif
[1134]163
[1520]164        msg->mpi_status = new ::MPI_Status(status); 
165        memcheck("new "<< msg->mpi_status <<" : in ep_lib::Message_Check, msg->mpi_status = new ::MPI_Status");
[1134]166
167        #pragma omp critical (_query)
168        {
169          #pragma omp flush
[1520]170          comm->ep_comm_ptr->comm_list[dest_loc]->ep_comm_ptr->message_queue->push_back(msg); 
171          int dest_mpi = comm->ep_comm_ptr->size_rank_info[2].first;
172          memcheck("message_queue["<<dest_mpi<<","<<dest_loc<<"]->size = "<<comm->ep_comm_ptr->comm_list[dest_loc]->ep_comm_ptr->message_queue->size());
[1134]173          #pragma omp flush
174        }
175      }
176    }
177
178    return MPI_SUCCESS;
179  }
180
181
[1520]182 
183 
184 
[1134]185
[1520]186
[1134]187  int Message_Check_intercomm(MPI_Comm comm)
188  {
[1520]189    if(!comm->ep_comm_ptr->intercomm->mpi_inter_comm) return 0;
[1134]190
191    Debug("Message probing for intercomm\n");
192
193    int flag = true;
194    ::MPI_Message message;
195    ::MPI_Status status;
196    int current_ep_rank;
197    MPI_Comm_rank(comm, &current_ep_rank);
198
[1520]199    while(flag) // loop until the end of global queue "comm->ep_comm_ptr->intercomm->mpi_inter_comm"
[1134]200    {
201      Debug("Message probing for intracomm\n");
[1295]202
[1134]203      #pragma omp critical (_mpi_call)
204      {
[1520]205        ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm->ep_comm_ptr->intercomm->mpi_inter_comm), &flag, &status);
[1134]206        if(flag)
207        {
208          Debug("find message in mpi comm \n");
[1520]209          ::MPI_Mprobe(status.MPI_SOURCE, status.MPI_TAG, to_mpi_comm(comm->ep_comm_ptr->intercomm->mpi_inter_comm), &message, &status);
[1134]210        }
211      }
[1355]212     
[1347]213
[1134]214      if(flag)
215      {
216
[1520]217        MPI_Message msg = new ep_message; 
218        msg->mpi_message = new ::MPI_Message(message);
[1134]219
[1520]220        memcheck("new "<< msg <<" : in ep_lib::Message_Check, msg = new ep_message");
221        memcheck("new "<< msg->mpi_message <<" : in ep_lib::Message_Check, msg->mpi_message = new ::MPI_Message");
222             
[1134]223
[1520]224        msg->ep_tag  = bitset<15>(status.MPI_TAG >> 16).to_ulong(); 
225        int src_loc  = bitset<8> (status.MPI_TAG >> 8) .to_ulong(); 
226        int dest_loc = bitset<8> (status.MPI_TAG)           .to_ulong();
227        int src_mpi  = status.MPI_SOURCE;
228             
229        msg->ep_src  = get_ep_rank(comm, src_loc,  src_mpi);   
230#ifdef _showinfo
231        printf("status.MPI_TAG = %d, src_loc = %d, dest_loc = %d, ep_tag = %d\n", status.MPI_TAG, src_loc, dest_loc, msg->ep_tag);
232#endif
[1134]233
[1520]234        msg->mpi_status = new ::MPI_Status(status); 
235        memcheck("new "<< msg->mpi_status <<" : in ep_lib::Message_Check, msg->mpi_status = new ::MPI_Status");
[1134]236
237        #pragma omp critical (_query)
238        {
239          #pragma omp flush
[1520]240          comm->ep_comm_ptr->comm_list[dest_loc]->ep_comm_ptr->message_queue->push_back(msg);
241          memcheck("comm->ep_comm_ptr->comm_list["<<dest_loc<<"]->ep_comm_ptr->message_queue->size = "<<comm->ep_comm_ptr->comm_list[dest_loc]->ep_comm_ptr->message_queue->size());
[1134]242          #pragma omp flush
243        }
244      }
245    }
246
[1520]247    Message_Check_intracomm(comm);
[1134]248
249    return MPI_SUCCESS;
250  }
251
[1520]252 
[1196]253
[1134]254}
Note: See TracBrowser for help on using the repository browser.