source: XIOS/dev/branch_openmp/extern/src_ep_dev/ep_probe.cpp @ 2022

Last change on this file since 2022 was 1642, checked in by yushan, 5 years ago

dev on ADA. add flag switch _usingEP/_usingMPI

File size: 8.5 KB
RevLine 
[1134]1#include "ep_lib.hpp"
2#include <mpi.h>
3#include "ep_declaration.hpp"
[1295]4#include "ep_mpi.hpp"
[1134]5
6namespace ep_lib
7{
[1520]8  int MPI_Iprobe_mpi(int src, int tag, MPI_Comm comm, int *flag, MPI_Status *status)
9  {
[1539]10    ::MPI_Status mpi_status;
11
12    ::MPI_Iprobe(src<0? MPI_ANY_SOURCE : src, tag<0? MPI_ANY_TAG: tag, to_mpi_comm(comm->mpi_comm), flag, &mpi_status);
13
14    status->mpi_status = new ::MPI_Status(mpi_status);
[1520]15    status->ep_src = src;
16    status->ep_tag = tag;
17  }
[1134]18
[1539]19
20  int MPI_Improbe_mpi(int src, int tag, MPI_Comm comm, int *flag, MPI_Message *message, MPI_Status *status)
21  {
22    ::MPI_Status mpi_status;
23    ::MPI_Message mpi_message;
24
25    #ifdef _openmpi
26    #pragma omp critical (_mpi_call)
27    {
28      ::MPI_Iprobe(src<0? MPI_ANY_SOURCE : src, tag<0? MPI_ANY_TAG: tag, to_mpi_comm(comm->mpi_comm), flag, &mpi_status);
29      if(*flag)
30      {
31        ::MPI_Mprobe(src<0? MPI_ANY_SOURCE : src, tag<0? MPI_ANY_TAG: tag, to_mpi_comm(comm->mpi_comm), &mpi_message, &mpi_status);
32      }
33    }
34    #elif _intelmpi
35    ::MPI_Improbe(src<0? MPI_ANY_SOURCE : src, tag<0? MPI_ANY_TAG: tag, to_mpi_comm(comm->mpi_comm), flag, &mpi_message, &mpi_status);
36    #endif
37     
38    status->mpi_status = new ::MPI_Status(mpi_status);
39    status->ep_src = src;
40    status->ep_tag = tag;
41
42    (*message)->mpi_message = &message;
43    (*message)->ep_src = src;
44    (*message)->ep_tag = tag;
45  }
46
47
[1134]48  int MPI_Iprobe(int src, int tag, MPI_Comm comm, int *flag, MPI_Status *status)
49  {
[1520]50    if(!comm->is_ep)
[1134]51    {
[1520]52      Debug("MPI_Iprobe with MPI\n");
53      return MPI_Iprobe_mpi(src, tag, comm, flag, status);
[1134]54    }
[1539]55
56    if(comm->is_intercomm)
[1520]57    {
[1539]58      if(src>=0) src = comm->inter_rank_map->at(src);
59    } 
[1520]60   
[1539]61    return MPI_Iprobe_endpoint(src, tag, comm, flag, status);
62  }
[1134]63
[1539]64  int MPI_Iprobe_endpoint(int src, int tag, MPI_Comm comm, int *flag, MPI_Status *status)
65  {
66    Debug("MPI_Iprobe with EP\n");
[1134]67
[1539]68    *flag = false;
69   
[1642]70    #pragma omp critical (_query)
71    for(Message_list::iterator it = comm->ep_comm_ptr->message_queue->begin(); it!= comm->ep_comm_ptr->message_queue->end(); ++it)
72    {
73      bool src_matched = src<0? true: (*it)->ep_src == src;
74      bool tag_matched = tag<0? true: (*it)->ep_tag == tag;
75     
76      if(src_matched && tag_matched)       
77      {
78        Debug("find message\n");
79         
80        status->mpi_status = new ::MPI_Status(*static_cast< ::MPI_Status*>((*it)->mpi_status));
81        status->ep_src = (*it)->ep_src;
82        status->ep_tag = (*it)->ep_tag;
83       
84        if(comm->is_intercomm)
85        {
86          for(INTER_RANK_MAP::iterator iter = comm->inter_rank_map->begin(); iter != comm->inter_rank_map->end(); iter++)
87          {
88            if(iter->second == (*it)->ep_src) status->ep_src=iter->first;
89          }
90        }
91
92        *flag = true;
93        break;
94      }
95    }
96    if(*flag) return 0;
97   
[1539]98    Message_Check(comm);
99
100    #pragma omp flush
101
102    #pragma omp critical (_query)
103    for(Message_list::iterator it = comm->ep_comm_ptr->message_queue->begin(); it!= comm->ep_comm_ptr->message_queue->end(); ++it)
104    {
105      bool src_matched = src<0? true: (*it)->ep_src == src;
106      bool tag_matched = tag<0? true: (*it)->ep_tag == tag;
107     
108      if(src_matched && tag_matched)       
[1134]109      {
[1539]110        Debug("find message\n");
111         
112        status->mpi_status = new ::MPI_Status(*static_cast< ::MPI_Status*>((*it)->mpi_status));
113        status->ep_src = (*it)->ep_src;
114        status->ep_tag = (*it)->ep_tag;
[1134]115       
[1539]116        if(comm->is_intercomm)
[1134]117        {
[1539]118          for(INTER_RANK_MAP::iterator iter = comm->inter_rank_map->begin(); iter != comm->inter_rank_map->end(); iter++)
119          {
120            if(iter->second == (*it)->ep_src) status->ep_src=iter->first;
121          }
122        }
[1134]123
[1539]124        *flag = true;
125        break;
[1134]126      }
127    }
[1642]128    if(*flag) return 0;
[1134]129  }
130
[1539]131 
[1134]132
133  int MPI_Improbe(int src, int tag, MPI_Comm comm, int *flag, MPI_Message *message, MPI_Status *status)
134  {
[1520]135    if(!comm->is_ep)
[1134]136    {
[1539]137      Debug("MPI_Iprobe with MPI\n");
138      return MPI_Improbe_mpi(src, tag, comm, flag, message, status);
139    }
[1134]140
[1539]141    if(comm->is_intercomm)
142    {
143      src = comm->inter_rank_map->at(src);
144      *message = new ep_message;
145      printf("============= new *message = %p\n", *message);
146    } 
147   
148    return MPI_Improbe_endpoint(src, tag, comm, flag, message, status);
149  }
[1134]150
151
152
[1539]153  int MPI_Improbe_endpoint(int src, int tag, MPI_Comm comm, int *flag, MPI_Message *message, MPI_Status *status)
154  {
155    int ep_rank_loc = comm->ep_comm_ptr->size_rank_info[1].first;
156    int mpi_rank    = comm->ep_comm_ptr->size_rank_info[2].first;
[1134]157
[1539]158    *flag = false;
159   
[1642]160    #pragma omp critical (_query)
161    if(! comm->ep_comm_ptr->message_queue->empty())
162    {
163      for(Message_list::iterator it = comm->ep_comm_ptr->message_queue->begin(); it!= comm->ep_comm_ptr->message_queue->end(); ++it)
164      {
165                                         
166        bool src_matched = src<0? true: (*it)->ep_src == src;
167        bool tag_matched = tag<0? true: (*it)->ep_tag == tag;
168       
169        if(src_matched && tag_matched)
170        {
171          *flag = true;
172
173          status->mpi_status = new ::MPI_Status(*static_cast< ::MPI_Status*>((*it)->mpi_status));
174          memcheck("new "<< status->mpi_status << " : in ep_lib::MPI_Improbe, status->mpi_status = new ::MPI_Status");
175          status->ep_src = (*it)->ep_src;
176          status->ep_tag = (*it)->ep_tag;
177
178          (*message)->mpi_message = new ::MPI_Message(*static_cast< ::MPI_Message*>((*it)->mpi_message));
179          memcheck("new "<< (*message)->mpi_message <<" : in ep_lib::MPI_Improbe, (*message)->mpi_message = new ::MPI_Message");
180          (*message)->ep_src = (*it)->ep_src;
181          (*message)->ep_tag = (*it)->ep_tag;
182                                     
183
184          #pragma omp critical (_query2)
185          {             
186            memcheck("delete "<< (*it)->mpi_message <<" : in ep_lib::Message_Check, delete (*it)->mpi_message");
187            memcheck("delete "<< (*it)->mpi_status <<" : in ep_lib::Message_Check, delete (*it)->mpi_status");
188            memcheck("delete "<< (*it) <<" : in ep_lib::Message_Check, delete (*it)");
189           
190           
191            delete (*it)->mpi_message;     
192            delete (*it)->mpi_status; 
193            delete *it;
194           
195                       
196            comm->ep_comm_ptr->message_queue->erase(it);
197            memcheck("message_queue["<<mpi_rank<<","<<ep_rank_loc<<"]->size = "<<comm->ep_comm_ptr->message_queue->size());
198            #pragma omp flush
199          }
200         
201          break;
202        }
203
204      }
205    }
206
207    if(*flag) return 0;
208   
[1539]209    Message_Check(comm);
210   
[1134]211    #pragma omp flush
212
213    #pragma omp critical (_query)
[1520]214    if(! comm->ep_comm_ptr->message_queue->empty())
[1134]215    {
[1520]216      for(Message_list::iterator it = comm->ep_comm_ptr->message_queue->begin(); it!= comm->ep_comm_ptr->message_queue->end(); ++it)
[1134]217      {
[1520]218                                         
219        bool src_matched = src<0? true: (*it)->ep_src == src;
220        bool tag_matched = tag<0? true: (*it)->ep_tag == tag;
[1134]221       
222        if(src_matched && tag_matched)
223        {
224          *flag = true;
225
[1520]226          status->mpi_status = new ::MPI_Status(*static_cast< ::MPI_Status*>((*it)->mpi_status));
227          memcheck("new "<< status->mpi_status << " : in ep_lib::MPI_Improbe, status->mpi_status = new ::MPI_Status");
228          status->ep_src = (*it)->ep_src;
229          status->ep_tag = (*it)->ep_tag;
[1134]230
[1520]231          (*message)->mpi_message = new ::MPI_Message(*static_cast< ::MPI_Message*>((*it)->mpi_message));
232          memcheck("new "<< (*message)->mpi_message <<" : in ep_lib::MPI_Improbe, (*message)->mpi_message = new ::MPI_Message");
233          (*message)->ep_src = (*it)->ep_src;
234          (*message)->ep_tag = (*it)->ep_tag;
235                                     
[1134]236
237          #pragma omp critical (_query2)
238          {             
[1520]239            memcheck("delete "<< (*it)->mpi_message <<" : in ep_lib::Message_Check, delete (*it)->mpi_message");
240            memcheck("delete "<< (*it)->mpi_status <<" : in ep_lib::Message_Check, delete (*it)->mpi_status");
241            memcheck("delete "<< (*it) <<" : in ep_lib::Message_Check, delete (*it)");
242           
243           
244            delete (*it)->mpi_message;     
245            delete (*it)->mpi_status; 
246            delete *it;
247           
248                       
249            comm->ep_comm_ptr->message_queue->erase(it);
250            memcheck("message_queue["<<mpi_rank<<","<<ep_rank_loc<<"]->size = "<<comm->ep_comm_ptr->message_queue->size());
[1134]251            #pragma omp flush
252          }
[1520]253         
[1134]254          break;
255        }
256
257      }
258    }
[1642]259
260    if(*flag) return 0;
261
[1134]262  }
263
264}
265
[1539]266
Note: See TracBrowser for help on using the repository browser.