source: XIOS/dev/branch_yushan_merged/extern/src_ep_dev/ep_message.cpp @ 1196

Last change on this file since 1196 was 1196, checked in by yushan, 4 years ago

add request_check. test client and complete OK

File size: 9.7 KB
Line 
1/*!
2   \file ep_message.cpp
3   \since 2 may 2016
4
5   \brief Definitions of MPI endpoint function: Message_Check
6 */
7
8#include "ep_lib.hpp"
9#include <mpi.h>
10#include "ep_declaration.hpp"
11
12using namespace std;
13
14extern std::list< ep_lib::MPI_Request* > * EP_PendingRequests;
15#pragma omp threadprivate(EP_PendingRequests)
16
17namespace ep_lib
18{
19
20  int Message_Check(MPI_Comm comm)
21  {
22    int myRank;
23    MPI_Comm_rank(comm, &myRank);
24
25    if(!comm.is_ep) return 0;
26
27    if(comm.is_intercomm)
28    {
29      return  Message_Check_intercomm(comm);
30    }
31
32    int flag = true;
33    ::MPI_Message message;
34    ::MPI_Status status;
35    int mpi_source;
36
37    while(flag) // loop until the end of global queue
38    {
39      Debug("Message probing for intracomm\n");
40      ::MPI_Comm mpi_comm = static_cast< ::MPI_Comm> (comm.mpi_comm);
41      #ifdef _openmpi
42      #pragma omp critical (_mpi_call)
43      {
44        ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, mpi_comm, &flag, &status);
45        if(flag)
46        {
47          Debug("find message in mpi comm \n");
48          mpi_source = status.MPI_SOURCE;
49          int tag = status.MPI_TAG;
50          ::MPI_Mprobe(mpi_source, tag, mpi_comm, &message, &status);
51
52        }
53      }
54      #elif _intelmpi
55      //#pragma omp critical (_mpi_call)
56      //::MPI_Improbe(MPI_ANY_SOURCE, MPI_ANY_TAG, mpi_comm, &flag, &message, &status);
57      #pragma omp critical (_mpi_call)
58      {
59        ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, mpi_comm, &flag, &status);
60        if(flag)
61        {
62          Debug("find message in mpi comm \n");
63          mpi_source = status.MPI_SOURCE;
64          int tag = status.MPI_TAG;
65          ::MPI_Mprobe(mpi_source, tag, mpi_comm, &message, &status);
66
67        }
68      }
69      #endif
70     
71      if(flag)
72      {
73
74        MPI_Message *msg_block = new MPI_Message; //printf("myRank = %d, new ok\n", myRank);
75        msg_block->mpi_message = message;  //printf("myRank = %d, msg_block->mpi_message = %d\n", myRank, msg_block->mpi_message);
76        msg_block->ep_tag = bitset<15>(status.MPI_TAG >> 16).to_ulong();  //printf("myRank = %d, msg_block->ep_tag = %d\n", myRank, msg_block->ep_tag);
77        int src_loc       = bitset<8> (status.MPI_TAG >> 8) .to_ulong();  //printf("myRank = %d, src_loc = %d\n",           myRank, src_loc);
78        int dest_loc      = bitset<8> (status.MPI_TAG)      .to_ulong();  //printf("myRank = %d, dest_loc = %d\n",          myRank, dest_loc);
79        int src_mpi       = status.MPI_SOURCE;                            //printf("myRank = %d, src_mpi = %d\n",           myRank, src_mpi);
80             
81        msg_block->ep_src  = get_ep_rank(comm, src_loc,  src_mpi);        //printf("myRank = %d, msg_block->ep_src = %d\n",  myRank, msg_block->ep_src);
82        int dest_mpi = comm.ep_comm_ptr->size_rank_info[2].first;
83        int ep_dest = get_ep_rank(comm, dest_loc, dest_mpi);
84        //printf("myRank = %d, probed one message, ep_src = %d, ep_dest = %d, tag = %d, message = %d\n", myRank, msg_block->ep_src, ep_dest, msg_block->ep_tag, msg_block->mpi_message);
85        msg_block->mpi_status = new ::MPI_Status(status);
86
87        MPI_Comm* ptr_comm_list = comm.ep_comm_ptr->comm_list;
88        MPI_Comm* ptr_comm_target = &ptr_comm_list[dest_loc];
89
90
91        #pragma omp critical (_query)
92        {
93          #pragma omp flush
94          ptr_comm_target->ep_comm_ptr->message_queue->push_back(*msg_block); 
95          //printf("myRank = %d, push_back OK, ep_src = %d, ep_tag = %d, dest = %d(%d)\n", myRank,
96          //                                                                                   ptr_comm_target->ep_comm_ptr->message_queue->back().ep_src,
97          //                                                                                   ptr_comm_target->ep_comm_ptr->message_queue->back().ep_tag,
98          //                                                                                   ep_dest, dest_loc);
99   
100          #pragma omp flush
101        }
102       
103        delete msg_block;
104        //printf("myRank = %d, delete msg_block, queue size = %lu\n", myRank, ptr_comm_target->ep_comm_ptr->message_queue->size());
105       
106      }
107
108    }
109
110    return MPI_SUCCESS;
111  }
112
113
114
115  int Message_Check_intercomm(MPI_Comm comm)
116  {
117    if(!comm.ep_comm_ptr->intercomm->mpi_inter_comm) return 0;
118
119    Debug("Message probing for intercomm\n");
120
121    int flag = true;
122    ::MPI_Message message;
123    ::MPI_Status status;
124    int mpi_source;
125    int current_ep_rank;
126    MPI_Comm_rank(comm, &current_ep_rank);
127
128    while(flag) // loop until the end of global queue "comm.ep_comm_ptr->intercomm->mpi_inter_comm"
129    {
130      Debug("Message probing for intracomm\n");
131      ::MPI_Comm mpi_comm = static_cast< ::MPI_Comm> (comm.ep_comm_ptr->intercomm->mpi_inter_comm);  // => mpi_intercomm
132     
133      #ifdef _openmpi
134      #pragma omp critical (_mpi_call)
135      {
136        ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, mpi_comm, &flag, &status);
137        if(flag)
138        {
139          Debug("find message in mpi comm \n");
140          mpi_source = status.MPI_SOURCE;
141          int tag = status.MPI_TAG;
142          ::MPI_Mprobe(mpi_source, tag, mpi_comm, &message, &status);
143
144        }
145      }
146      #elif _intelmpi
147      #pragma omp critical (_mpi_call)
148      {
149        ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, mpi_comm, &flag, &status);
150        if(flag)
151        {
152          Debug("find message in mpi comm \n");
153          mpi_source = status.MPI_SOURCE;
154          int tag = status.MPI_TAG;
155          ::MPI_Mprobe(mpi_source, tag, mpi_comm, &message, &status);
156
157        }
158      }
159      //::MPI_Improbe(MPI_ANY_SOURCE, MPI_ANY_TAG, mpi_comm, &flag, &message, &status);       
160      #endif
161
162      if(flag)
163      {
164
165        MPI_Message *msg_block = new MPI_Message;
166
167        msg_block->mpi_message = message;
168        msg_block->ep_tag = bitset<15>(status.MPI_TAG >> 16).to_ulong();
169        int src_loc       = bitset<8> (status.MPI_TAG >> 8) .to_ulong();
170        int dest_loc      = bitset<8> (status.MPI_TAG)      .to_ulong();
171        int src_mpi       = status.MPI_SOURCE;
172        int current_inter = comm.ep_comm_ptr->intercomm->local_rank_map->at(current_ep_rank).first;
173             
174        msg_block->ep_src  = get_ep_rank_intercomm(comm, src_loc,  src_mpi);
175        msg_block->mpi_status = new ::MPI_Status(status);
176
177
178        MPI_Comm* ptr_comm_list = comm.ep_comm_ptr->comm_list;
179        MPI_Comm* ptr_comm_target = &ptr_comm_list[dest_loc];
180
181
182        #pragma omp critical (_query)
183        {
184          #pragma omp flush
185          ptr_comm_target->ep_comm_ptr->message_queue->push_back(*msg_block);
186          #pragma omp flush
187        }
188       
189        delete msg_block;
190       
191      }
192
193    }
194
195    flag = true;
196    while(flag) // loop until the end of global queue "comm.mpi_comm"
197    {
198      Debug("Message probing for intracomm\n");
199      ::MPI_Comm mpi_comm = static_cast< ::MPI_Comm> (comm.mpi_comm);
200      #ifdef _openmpi
201      #pragma omp critical (_mpi_call)
202      {
203        ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, mpi_comm, &flag, &status);
204        if(flag)
205        {
206          Debug("find message in mpi comm \n");
207          mpi_source = status.MPI_SOURCE;
208          int tag = status.MPI_TAG;
209          ::MPI_Mprobe(mpi_source, tag, mpi_comm, &message, &status);
210
211        }
212      }
213      #elif _intelmpi
214      #pragma omp critical (_mpi_call)
215      {
216        ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, mpi_comm, &flag, &status);
217        if(flag)
218        {
219          Debug("find message in mpi comm \n");
220          mpi_source = status.MPI_SOURCE;
221          int tag = status.MPI_TAG;
222          ::MPI_Mprobe(mpi_source, tag, mpi_comm, &message, &status);
223
224        }
225      }
226      //::MPI_Improbe(MPI_ANY_SOURCE, MPI_ANY_TAG, mpi_comm, &flag, &message, &status);       
227      #endif
228
229      if(flag)
230      {
231
232        MPI_Message *msg_block = new MPI_Message;
233       
234        msg_block->mpi_message = message;
235        msg_block->ep_tag = bitset<15>(status.MPI_TAG >> 16).to_ulong();
236        int src_loc       = bitset<8> (status.MPI_TAG >> 8) .to_ulong();
237        int dest_loc      = bitset<8> (status.MPI_TAG)      .to_ulong();
238        int src_mpi       = status.MPI_SOURCE;
239        int current_inter = comm.ep_comm_ptr->intercomm->local_rank_map->at(current_ep_rank).first;
240       
241        msg_block->ep_src  = get_ep_rank_intercomm(comm, src_loc, src_mpi);
242        msg_block->mpi_status = new ::MPI_Status(status);
243       
244
245        MPI_Comm* ptr_comm_list = comm.ep_comm_ptr->comm_list;
246        MPI_Comm* ptr_comm_target = &ptr_comm_list[dest_loc];
247
248
249        #pragma omp critical (_query)
250        {
251          #pragma omp flush
252          ptr_comm_target->ep_comm_ptr->message_queue->push_back(*msg_block);
253          //printf("probed one message, ep_src = %d, tag = %d, mpi_status = %p (%p), message = %d\n", msg_block->ep_src, msg_block->ep_tag, msg_block->mpi_status, &status, msg_block->mpi_message);
254          #pragma omp flush
255        }
256       
257        delete msg_block;
258       
259      }
260
261    }
262
263    return MPI_SUCCESS;
264  }
265
266  int Request_Check()
267  {
268    MPI_Status status;
269    MPI_Message message;
270    int probed = false;
271    int recv_count = 0;
272    std::list<MPI_Request* >::iterator it;
273   
274    for(it = EP_PendingRequests->begin(); it!=EP_PendingRequests->end(); it++)
275    { 
276      Message_Check((*it)->comm);
277    }
278
279
280    for(it = EP_PendingRequests->begin(); it!=EP_PendingRequests->end(); )
281    {
282      MPI_Improbe((*it)->ep_src, (*it)->ep_tag, (*it)->comm, &probed, &message, &status);
283      if(probed)
284      {
285        MPI_Get_count(&status, (*it)->ep_datatype, &recv_count);
286        MPI_Imrecv((*it)->buf, recv_count, (*it)->ep_datatype, &message, *it);
287        (*it)->type = 3;
288        //printf("request add = %p, mpi_request=%d\n", *it, (*it)->mpi_request);
289        EP_PendingRequests->erase(it);
290        it = EP_PendingRequests->begin();
291        continue;
292      }
293      it++;
294    }
295  }
296
297}
Note: See TracBrowser for help on using the repository browser.